# <font face="times"><font size="6pt"><p style = 'text-align: center;'> BRYN MAWR COLLEGE

<font face="times"><font size="6pt"><p style = 'text-align: center;'><b>Computational Methods in the Physical Sciences</b>
<br/><br/>
    
<p style = 'text-align: center;'><b>Module 15: Parallel Computing</b><br/><br/> 

   ***Prerequisite modules:*** Module 0, Module 1
   
   ***Estimated completion time:*** 1.5-4 hours
   
   ***Learning objectives:*** To learn some basic concepts and terminology related to parallel computing, to become familiar with some of the common Python packages used for parallel processing, and to see how those packages can be used to parallelize a couple of example applications.

<img src = "./Images/Code_quality_xkcd.png" width=600>

<center>(Image credit: https://xkcd.com/1513)</center>

This module presents the basics of ***parallel computing***, which makes use of the multiple processors common in today's computers to speed up certain types of computation.  For large-scale computations, parallel processing techniques can spread the computational load among multiple separate computers.

### <font color="blue">Scientist Profile</font>

Anousheh Ansari, born September 12, 1966, is an Iranian-American engineer and co-founder and chairwoman of Prodea Systems, an internet-of-things services company. She previously served as co-founder and CEO of Telecom Technologies, Inc., a maker of software used for switching data streams within telecommunications networks.  She received a B.S. degree in electrical engineering and computer science from George Mason University, and a master's degree in electrical engineering from George Washington University.  She is the CEO of the XPRIZE Foundation, which funds competitive awards to encourage development of solutions to global challenges.  The \\$10 million Ansari XPRIZE, funded by Anousheh and her family, spurred the growth of the commercial spaceflight industry, helping to stimulate the launches of SpaceX, Blue Origin, and Virgin Galactic.  Ansari also co-founded The Billion Dollar Fund for Women, with the goal of investing $1 billion in women-founded companies by 2020.  On September 18, 2006, a few days after her 40th birthday, Ansari became the first Iranian in space and the fourth self-funded space tourist, as well as the first self-funded woman to fly to the International Space Station.  

<img src ='./Images/Anousheh_Ansari.jpg' width=300>

<center>(Image credit: NASA, Public Domain, via Wikipedia, https://en.wikipedia.org/wiki/Anousheh_Ansari#/media/File:AnoushehAnsari.jpg)

### <font color="blue">Introduction</font>

The basic idea behind parallel computing is to split up computations into independent parts that can be sent to different processors within a single computer, or to different computers in a connected ***cluster***, for more-or-less *simultaneous* processing.  This process is called ***parallelization***.   

A bit of terminology: the term ***core*** (sometimes, ***compute core*** or ***computing core***) used to mean the same thing as ***CPU*** (Central Processing Unit)  Today, most CPUs consist of multiple cores that can work on separate tasks.  Further, cores can be split using "simultaneous multi-threading" or "hyperthreading" into "virtual cores," called "threads," that also can perform separate tasks.  For example, the author's laptop has 8 cores but 16 "logical processors" in its CPU, meaning it has two virtual cores for each actual core.  Note that the meaning of the term "thread" here is not exactly the same as its meaning below, but the two meanings are similar.  

Modern computers also contain ***GPU***s (Graphics Processing Units) which support the display of images and video on a monitor.  The video-heavy nature of computer games has prompted the development of increasingly powerful GPUs in recent years.  They are designed to perform matrix operations very efficiently, and are inherently parallel, so they are increasingly being used for parallel processing tasks. 

Not all computational tasks are parallelizable, and those that are generally can be only partially parallelized.  If a computation can be at least partially parallelized, the time to complete it might be reduced as a result.  This is not always the case, however: there can be a significant amount of computational "overhead" involved in coordinating the multiple CPUs (or GPUs, or cores) to work together properly and possibly exchange data, and the time spent by the computer managing that coordination could exceed the time saved by parallelization.

The sorts of computations that benefit greatly from parallelization include those involving lots of interacting "parts" whose individual behaviors can be separated sufficiently from those of the other parts.  In the scientific/engineering realm, those parts could be particles, fields, stars, etc., and tasks for which parallel computing is used include modeling of: the climate,  the motions of stars in galaxies, air flow around moving vehicles, traffic flows, and plasma in fusion machines.

A given computer system can be categorized in one of a number of different parallel computing architectures depending on whether only data, or only instructions (for what to do with the data), or both are parallelized (i.e., split among different processors).  Lawrence Livermore National Laboratory has a nice, mostly nontechnical tutorial on parallel computing here: https://hpc.llnl.gov/documentation/tutorials/introduction-parallel-computing-tutorial .

That website provides a simple example of a computation that *can* be parallelized: determining the potential energies of the various possible shapes (or "conformations") of a molecule in order to find the lowest-energy conformation overall.  Since the energies of the different conformations are independent of each other, they can be computed by different CPUs without the need for any coordination between them.  

A simple example of a computation that *cannot* be parallelized usefully is determination of the Fibonacci numbers using the defining relation $F(n) = F(n-1) + F(n-2)$, where each number depends on the previous two.  In this case, splitting the computation of $F(n-2)$ off in one CPU and $F(n-1)$ in another doesn't really help, since each of those depend themselves on earlier numbers in the sequence which would have to be computed first, presumably by other CPUs.  For $n$ greater than the number of CPUs available, the computation would run out of CPUs before finding $F(n)$.  However, the task *can* be parallelized using a different algorithm (Binet's formula): $F(n) = \left[ \phi^n - (-\phi)^{-n} \right] \big/ \sqrt{5}$, where $\phi \equiv \left( 1 + \sqrt{5} \right) \bigm/ 2$.  In this case, one could have $\phi^n$ and $-\phi^{-n}$ computed by different cores and then combined to get $F(n)$.  The moral of this example is that just because a problem cannot be parallelized via one approach doesn't mean it cannot be parallelized at all!

The procedure required for parallelizing a particular task is specific to that task, and depends on the degree to which the multiple tasks or cores involved need to coordinate with each other.  In one programming paradigm, parallel tasks that need to share data will have access to a common region of memory in the computer (this is the ***shared memory*** paradigm); in this case, if the tasks can change the data then access to it must be controlled carefully to avoid one task altering data inappropriately for another.  In another paradigm, parallel tasks will exchange the data that needs to be shared: this is called ***message passing***, and is accomplished using a ***message passing interface***, or ***MPI***.  The Python version of MPI is ***`mpi4py`*** (see https://pypi.org/project/mpi4py/).  (Similar interfaces exist for other programming languages.)  A message passing approach is indispensible for parallel computations involving large numbers of cores (approx. 1000 or more).  For a brief, readable introduction to MPI, see **Effective Computation in Physics**, by A. Scopatz and K. Huff, O'Reilly Media Inc., 2015.  (That text also discusses several approaches to parallelization of a more physically-based example problem than those considered below: the gravitational N-body problem.)

Parallel computing is a complicated topic, and this module will present only some simple examples of parallelization.  For all but the simplest tasks, the right parallelization approach to use is highly dependent on the particular problem to be solved and the computing resources available. 

Before getting into some of the Python approaches to parallelization, it is necessary to introduce some commonly used terms.

#### Terminology

* A ***process*** is an instance of a running program, e.g. a running Jupyter notebook, a Microsoft Word document being edited, etc.  Other "background" processes are being run by your computer's operating system all the time.  You can see them on a Windows machine using the Task Manager, on a Mac using the Activity Monitor, and on a Linux/Unix machine using `top` from a command line.  Note that each process has its own ID number called the Process ID or ***PID***.


* A ***thread*** is a process within a process.  Processes can have multiple threads.  *Threads share data, so changing the value of some variable in one thread will change it in all the others*.  *That's not the case for processes*.  


* A ***parent*** process is the "base" from which other processes can be created.  There is just one parent process: the Python ***interpreter***, which translates your code into actions by the computer.  


* A ***child*** process (sometimes called a ***subprocess***) is one that has come from the parent process.  

     Child processes can be created in one of two main ways.  The differences are technical and not something to worry about at this point, but you may encounter the terms, so it might be helpful to have seen at least basic definitions.

     * ***Forking***: the child process essentially is a copy of the parent process and it "inherits" most of the attributes of the parent process.  

     * ***Spawning***: the child process is a "fresh" one, which inherits minimal attributes from the parent.

Parallel computing can, in general, be done using either parallel processes or parallel threads.  In Python, though, there's a complication with threading due to a technical feature of the language called the ***GIL*** (for Global Interpreter Lock).  It effectively prevents two threads from executing simultaneously to avoid conflicting actions.  That means that threading won't speed up a task that's ***CPU-bound***; i.e. one whose speed is limited by the speed of the CPU.  However, threading *can* speed up a task if what causes it to be slow is not the demand on the CPU, but the rate at which data must be transferred, since each thread can manage some of the data transfer.  (Such a task is said to be ***I/O-bound***; "I/O" is shorthand for Input/Output.)  So, if your task does not put a lot of stress on the CPU but does involve a lot of data input/output, multi-threading might help speed it up.  If it does place a heavy demand on the CPU, as much scientific computing would do, then multi-processing would be the right approach to speed it up in Python.

Some of the packages discussed below explicitly use terms like `delayed` and `futures`.  These refer to the way some of the package functions work: they define operations that are to be executed in a delayed fashion some time in the future, i.e., not right after the user's task is defined.  Instead, those user-defined tasks are distributed to cores to be run according to a schedule designed to optimize performance. 

> **Note**: With some of the Python packages for parallel processing there can be problems getting return values from user-defined functions when working in a Jupyter notebook.  The examples below mostly avoid those problems, but it might be the case that the code is not as efficient or elegant as it could be because of that.  Additionally, some of the features of the packages discussed below simply do not work in a notebook.  If you need to use those packages, you are likely to have more success working with Python script files in a code development environment like Spyder (bundled with Anaconda's Python distribution).

This module will look at five Python packages for parallel processing: `multiprocessing` to use multiple processes; `concurrent.futures`, with methods to create parallel threads or processes (based on `multiprocessing` and its multithreading cousin `threading`); `joblib`, which also can do both; `ipyparallel`, designed for Jupyter notebooks; and `dask`, which integrates closely with `numpy`, `pandas` and `scikit-learn`.  [`dask`, `ray` (https://ray.io/), `bodo` (https://docs.bodo.ai/2023.1/), and other parallel processing packages are designed mainly for large data science projects.]  Brief mention also will be made of `CUDA`, which allows a programmer to make use of GPUs for computation.  Different packages in this group might be more or less helpful, depending on the application.

## <font color="blue">15.1 `multiprocessing/multiprocess`</font>

(This section is based partly on https://www.sitepoint.com/python-multiprocessing-parallel-programming/ and https://pythonnumericalmethods.berkeley.edu/notebooks/chapter13.02-Multiprocessing.html .)

The main Python package for creating parallel *processes* is **`multiprocessing`** (https://docs.python.org/3/library/multiprocessing.html).  It works when run from an integrated development environment like Spyder (which comes with the Anaconda distribution of Python) or by command line from a terminal, but it turns out that it does not run properly inside a Jupyter notebook: results produced by `print` and `return` statements do not display in the notebook. 

Fortunately, the Python community has created a variant of `multiprocessing` called **`multiprocess`** which does work correctly in a notebook (see https://pypi.org/project/multiprocess/).  All of the function names in that package are the same as those in `multiprocessing`, so any code you create or find that uses the `multiprocessing` package should run in a notebook if you import `multiprocess` instead of `multiprocessing`.  No other changes should be needed (in theory, anyway).

So, we start by importing `multiprocess` as `mp`, and `time` to do timing of computations:

In [3]:
import multiprocess as mp

import time

The first thing to do is identify the number of cores in the machine you're using, by executing the code cell below:

In [4]:
num_cores = mp.cpu_count()

print(f"Number of Cores = {num_cores}")

Number of Cores = 16


### <font color="blue">15.1.1 -- Example 1: Random Number Squaring</font>

As a simple demonstration of how parallel processing can speed up a computation, let's create a function that will create and square a random number based on a random seed supplied as an argument:

In [8]:
def random_square(myseed):
    from numpy.random import seed, randint  # Must be imported inside function to be available to child processes
    
    seed(myseed)                            # set the seed to be the argument to the function
    random_num = randint(0, 10)             # return a random number between 0 and 10 based on the seed
    
    return random_num**2

Check what this does:

In [16]:
results = []       # list to store results

for i in range(10):
    results.append(random_square(i))    # add output of random_square for seed "i"
    
print(results)

[25, 25, 64, 64, 49, 9, 81, 16, 9, 25]


To emphasize, `random_square(i)` isn't squaring the argument `i`, it's squaring the random number obtained from `randint` using `i` as a seed.  Since the output range for `randint` has been limited to `0, 1, ..., 9`, the output of `random_square` is limited to `[0, 1, 4, 9, ..., 64, 81]`.

This calculation of squares clearly is parallelizable, since each number can be squared independently of the others.

Now we'll call this function a large number of times with different seeds, store the results in a list, and time the process.  Because the `multiprocess` approach involves using a list comprehension to compute the function output from a set of seeds, we'll use a similar approach here to make the timing comparison fair.  

(List comprehensions were mentioned very briefly in Module 1.  They are a compact way of creating a new list from another one based on some condition.  They have the general form `newlist = [expression for item in iterable if condition == True]`.  For example, if `fruits` is a list of fruits, then `newlist = [x for x in fruits if "a" in x]` will pick out the fruits with the letter "a" in their names.  Here, the first `x` is the expression -- in this case, it's just picking out an element, but it could involve more complicated actions.  The second `x` is the "item" and the list `fruits` is the ***iterable*** (a quantity whose elements can be stepped through one by one using an index: strings, lists, and arrays are examples).  Finally, `"a" in x` is the condition that must be true for the expression to be executed.)

In [17]:
t0 = time.time()                                        # initial time
    
results = [random_square(i) for i in range(1000000)]    # list comprehension uses outputs of range as seeds for random_square
    
print(f"First five and last five results: {results[0:5]}, ..., {results[-5:]}") 

t1 = time.time()                                        # time at end of process

print(f"Execution time = {t1 - t0:.2f} sec.")

First five and last five results: [25, 25, 64, 64, 49], ..., [81, 36, 1, 1, 1]
Execution time = 5.40 sec.


The list comprehension beginning with `results` provides `i` (which ranges from `0` to the number specified in `range` minus `1`) to the function `random_square(i)`.  `results` then is a list of squared integers, with a length equal to the argument of `range`.

Running the function on the author's laptop for one million seeds takes about 5 seconds.

Now we'll do the same thing using a parallel approach.  This involves three steps: 

1. First, ensure we're in the parent process, which is accomplished with the line `if __name__ == "__main__":` that starts off the section of code that calls `multiprocess` functions.  
  
2. Next, with the `Pool()` function from the `multiprocess` package create a "pool" of processes (often called ***worker processes***), in this case equal to the number of cores in the machine (the number could be smaller but should not be larger).  
  
3. Finally, to distribute the calculations to the available cores we call the **`pool.map()`** function, supplying our random number function and the set of seeds to apply it to as arguments.  Because those seeds are expected to be provided in the form of a *set* of values that can be iterated over, like the output of `range` (rather than as individual numbers), `pool.map()` is applied to our function within a list comprehension (rather than within a loop).  

In [18]:
t0 = time.time()                                                     # initial time

if __name__ == "__main__":                                           # check that we're in the parent process
    pool = mp.Pool(processes=num_cores)                              # create set of worker processes
    results = pool.map(random_square, [i for i in range(1000000)])   # list comprehension for constructing list of results

print(f"First five and last five results: {results[0:5]}, ..., {results[-5:]}") 

t1 = time.time()                                                     # time at end of process

print(f"Execution time = {t1 - t0:.2f} sec.")

First five and last five results: [25, 25, 64, 64, 49], ..., [81, 36, 1, 1, 1]
Execution time = 2.43 sec.


The key line above is `results2 = pool.map(random_square, [i for i in range(5000000)])`, in which the `pool.map()` function "maps" the tasks (in this case, the computations using `random_square`) to the pool of processes created by the `mp.Pool()` function used in the preceding line.  The output values returned by `random_square` are stored in `results2`, as shown by the output of the `print()` function, which match those for the non-parallelized computation.

**NOTE**: While the code above will run without the line `if __name__ == "__main__":`, it is necessary in general when using `multiprocess(ing)` functions in order to avoid the possibility of an infinite sequence of function calls.  (That line ensures that the `multiprocess(ing)` functions are being called from within the parent process, not a child process.)  Note that the calls to functions in `multiprocess(ing)` are within an indented block following that line.  If you need to put a lot of code in that block, you can instead put it in its own function, and then just call that function (again, indented) after the `if __name__ == "__main__":` line.

On the author's computer, which has 16 cores, this approach takes about 40% as long as the non-parallelized approach.  Not a bad degree of improvement for a couple of simple changes to the code!  (The speed does not improve by a factor of 16 at least in part, and maybe entirely, because of the overhead involved in coordinating the activities of the cores, as mentioned earlier.)

If you're running this notebook on a Windows computer with a *local* installation of jupyter (i.e., not on a remote server), you can see the CPU usage while the code cells are executing by opening the Resource Monitor (type that name into the Search field on the Taskbar) and clicking on the CPU tab.  In the non-parallelized case, you may see a few of the CPUs trading off effort; in the parallelized case, you (probably) will see most or all of the CPUs working at the same time.

### <font color="blue">15.1.2 -- Example 2: Checking if a Number is Prime</font>

The parallelization approach used above worked just fine for the simple squaring application because the application was inherently parallel: squaring each number can be done independently of squaring all the others.  (Such problems sometimes are called "embarrassingly parallel"!)  Most applications will not be so simple to parallelize, so here we consider one for which we have to modify the code in order to parallelize the task.

This example involves a function to determine if a number provided as an argument is prime.  Here's the function:

In [19]:
def is_prime(n):
    '''Checks if 'n' is prime.'''
    
    # Restrict to positive integers
    if (not isinstance(n, int)) or (n < 1):
        raise ValueError("Input should be a positive integer.\n") 

    # Check special cases
    if n == 1:
        print("1 is not prime.")       # n = 1 has only one factor
        return False       
    if n == 2:
        print("2 is prime.")
        return True
    if n % 2 == 0:
        print("Not prime: 2 is a factor.")
        return False
    
    # Check for factors, from 3 up to the square root of n for all odd numbers
    for x in range(3, int(n**0.5)+1, 2):
        if n % x == 0:
            print("Not prime:", x, "is a factor.")
            return False
    print(n, "is prime.")


The code first checks if the number is 1, 2, or even, in which case the answer can be provided immediately.  (The check for "evenness" is done using the modulo operator, `%`, which you might remember from Module 1.  That operator provides the remainder left when one integer is divided by another.)  After that, the code loops through numbers from 3 up to the square root of the number of interest (rounded up to an integer), which is the largest a prime factor could be.  (The restriction to *positive* integers is not required, but is done for simplicity in this example.)

Let's time the code on a known large prime number.  (Be patient: this might take more than half a minute, depending on the computer you're running it on.)

In [20]:
import time

t0 = time.time()
is_prime(489133282872437279)
t1 = time.time()

print("Execution time =", t1 - t0, "sec.")

489133282872437279 is prime.
Execution time = 28.33681082725525 sec.


On the author's laptop, this takes roughly 27-28 seconds.  

Now if we try to use the same parallelization approach we did for the `random_square` function, we get an error message:

In [10]:
t0 = time.time()                                              # initial time

if __name__ == "__main__":
    pool = mp.Pool(processes=num_cores)                       # create set of worker processes
    results = pool.map(is_prime, 489133282872437279)          # list comprehension for constructing list of results

    
print("First five results: ", results[0:5])                   # first few results

t1 = time.time()                                              # time at end of process

print(f"Execution time = {t1 - t0:.2f} sec.")

TypeError: 'int' object is not iterable

The `'int' object is not iterable` message is telling us that we're not providing any iterable, like a counter `i`, that can be stepped through that will enable the `pool.map()` to distribute calculations to the worker processes, one for each value of `i`.  The only parameter our `is_prime()` function takes is the number $n$ whose "primeness" is to be determined, 489133282872437279 in the example.  (For the `random_square(i)` function, the argument `i` is the iterable.)

However, there is a `for` loop inside the code that could be parallelized: instead of having one core check all the possible prime factors from $3$ to $\sqrt{n}$, we could break up the interval spanned by the loop into smaller subintervals.  For example, if we wanted to distribute the calculation over two cores, one core could check the integers from $3$ to $\sqrt{n}/2$ and the other could check the rest of the integers up to $\sqrt{n}$.

We will have to slightly modify the code to enable the "sub-looping" described above.  The easiest way probably is to provide the function with the minimum and maximum integers we want the loop to cover.  Renaming the parallelizable form of the function `is_primeP`, we have:

In [17]:
def is_primeP(n, a, b):
    '''Checks if 'n' has prime factors between 'a' and 'b'.
       Returns "False" if prime factor found, "True" otherwise.'''
    
    # Restrict arguments to positive integers
    if (not isinstance(n, int)) or (n < 1):
        raise ValueError("Input should be a positive integer.\n") 
        
    if (not isinstance(a, int)) or (a < 0) or (not isinstance(b, int)) or (b < 0) or (b < a):
        raise ValueError("Limits should positive integers, with b > a.\n")

        
    # Check special cases
    if n == 1:
        print("1 is not prime.")       # n = 1 has only one factor
        return False       
    if n == 2:
        print("2 is prime.")
        return True
    if n % 2 == 0:
        print("Not prime: 2 is a factor.")
        return False
    
    
    # Adjust a & b if not odd
    if a % 2 == 0:
        a = a - 1
    if b % 2 == 0:
        b = b + 1
        
        
    # Check for factors, for odd numbers from a to b 
    for x in range(a, b+1, 2):
        if n % x == 0:
            return False
    return True


What will the limits `a` and `b` of the sub-loops be?  It depends on how many cores we run on.  Let `b` $-$ `a` $= \Delta$ and `numCores` $= N$.  Then, for integers spanning interval $\Delta$ and divided into $N$ subintervals, the boundaries of the subintervals would be roughly `a`, `a`$+\Delta/N$, `a`$+2\Delta/N$, ..., `b`.  This is only roughly correct since $\Delta/N$ generally won't be an integer, so we should round it up or down.  Since the different worker processes don't have to work with loops of exactly the same size, but we want to ensure that the subintervals span the full range up to $\sqrt{n}$, we should round up.  (This might mean that the subinterval at the `b` end is shorter than the others, but that doesn't matter.)  

With all that in mind, let's set up the parallelization code.  Because the `pool.map()` function accepts only a simple iterable, *one* item from which will be supplied to each call of our `is_primeP()` function, it won't do what we want, since we now need to provide *three* inputs to each call of our function (the number whose "primeness" we're checking, and the range limits of a subinterval over which to look for prime factors).  Fortunately, the `multiprocess`/`multiprocessing` packages have another function that will do what we need, called **`pool.starmap()`**.  It will take an argument that's an iterable and whose elements also are iterables.  To give a simple example, while the iterable that we provided to `pool.map()` for the `random_square` function essentially was the list `[1, 2, 3, ..., 1000000]`, we can provide `pool.starmap()` with an iterable list like `[(n, a1, b1), (n, a2, b2), ...]`, where `a1` and `b1` are the range limits of the first subinterval checking for prime factors of `n`, `a2` and `b2` are the range limits of the second subinterval, etc.  The sets `(n, a1, b1)` will be created as tuples, but could be lists, since both are iterable.

We will write a separate function to find the subinterval limits because it will be used several times in this module.  

In [8]:
def subint_limits(num, num_cores):
    '''Create list of tuples of range limits for finding possible prime factors of "num."'''
    
    from numpy import append, ceil, sqrt
    
    subint_range = int(ceil(sqrt(num)/num_cores))                # range of each subinterval ("ceil" rounds up)

    # Create set of iterables to provide to is_primeP()
    range_limits = []
    for i in range(3, subint_range*num_cores, subint_range):
        range_limits.append((num, i, int(i+subint_range)))       # iterable with elements of form (n, a1, b1)
        
    return range_limits

Now we use it to provide the needed inputs to `is_primeP`, which is called by `pool.starmap()`:

In [24]:
import time
import multiprocess as mp

t0 = time.time()                                       # initial time

num = 489133282872437279                               # number to check for primeness

num_cores = mp.cpu_count()

limits = subint_limits(num, num_cores)                 # get limits of subintervals
    
if __name__ == "__main__":
    pool = mp.Pool(processes=num_cores)                # create set of worker processes
    results = pool.starmap(is_primeP, limits)          # distribute subloops to workers

# Check results: if all subloops report True, then "num" is prime, otherwise it's not
if all(results):
    print("Number is prime.")
else:
    print("Number is not prime.")

t1 = time.time()                                       # time at end of process

print(f"Execution time = {t1-t0:.2f} sec.")

Number is prime.
Execution time = 4.14 sec.


On the author's computer, for the prime number 489133282872437279 (see the "List of prime numbers" on Wikipedia), the parallelized approach is around 7 times faster than the non-parallelized one.  This speedup won't happen for every input number, even in the case of very large ones. 

<font color="green"><b>Breakpoint 1</b></font>: If a non-prime number's smallest prime factor lies within the first subinterval, would you expect the parallelized version of the prime-finding code to be faster than the non-parallelized version?  If not, why not?

## <font color="blue">15.2 `concurrent.futures`</font>

As noted earlier, multithreading might save time on an I/O-bound task, but probably will not save much, if any, on a CPU-bound task.  Here we'll use the multithreading capability of **`concurrent.futures`**, available as part of Python since version 3.2.  It "runs on top of" `multiprocessing` (i.e., it uses the functions of that package).  (This website might be helpful for providing context, https://towardsdatascience.com/python-concurrency-concurrent-futures-15b56dc9a14d, but note that it uses Python script files, not Jupyter notebooks.)

### <font color="blue">15.2.1 Random Number Squaring</font>  
The code will include a slightly modified version of the `random_square()` function that includes a line of code to make it wait for a short time to simulate data I/O and artificially make the code behave as if it were I/O-bound.

In [25]:
def random_squareT(myseed):
    from numpy.random import seed, randint  # Must be imported inside function to be available to child processes
    
    seed(myseed)                            # set the seed to be the argument to the function
    random_num = randint(0, 10)             # return a random number between 0 and 10 based on the seed
    
    time.sleep(0.1)                         # pause for 0.1 sec
    
    return random_num**2

Now let's run this to get a baseline timing without multithreading.  Because of the pause in the function, we'll run for fewer values than before (otherwise, we'll be waiting a long time).

In [26]:
import time

t0 = time.time()

results = [random_squareT(i) for i in range(100)]        # Compute random)square for 100 inputs

print("First five and last five results:", results[0:5], "...", results[-5:])

t1 = time.time()

print(f"Execution time = {t1-t0:.2f} sec.")


First five and last five results: [25, 25, 64, 64, 49] ... [36, 16, 9, 16, 1]
Execution time = 11.09 sec.


This code sends 100 values to `random_squareT()` and therefore pauses 100 times for 0.1 sec each time, for a total "I/O" time of 10 seconds.  On the author's laptop, it takes about 11 sec to run the code, so nearly all of that is waiting time.

Now let's try the multithreaded approach using `concurrent.futures`.  This makes use of the **`ThreadPoolExecutor()`** method to distribute tasks among multiple threads.  The method that distributes the squaring task to the pool of processes is **`pool.map()`**. (Using `list()` on the outputs of the processes converts them into a list of values.)

In [27]:
from concurrent.futures import ThreadPoolExecutor

t0 = time.time()

if __name__ == '__main__':
    pool = ThreadPoolExecutor(max_workers=num_cores)                # create pool of threads
    results = list(pool.map(random_squareT, range(100)))            # distribute tasks; output made into a list

print("First five and last five results:", results[0:5], "...", results[-5:])

t1 = time.time()

print(f"Execution time = {t1-t0:.2} sec.")


First five and last five results: [25, 25, 64, 64, 49] ... [36, 16, 9, 16, 1]
Execution time = 0.81 sec.


Nice!  This takes only about 7% as long as the non-multithreaded example!

`concurrent.futures` also has a **`ProcessPoolExecutor`** for distributing to multiple processes.  If `ThreadPoolExecutor` is changed to `ProcessPoolExecutor` everywhere in the cell above, running the new cell gives an error (`BrokenProcessPool`, which indicates that at least one of the processes has terminated abruptly) on the author's laptop and on a second laptop, both running notebooks via the Anaconda Python distribution on Windows.  However, the `ProcessPoolExecutor` approach works on another machine (running Linux, and not Anaconda) and gives a time slightly longer than that for the `ThreadPoolExecutor` approach on that same machine (about 8% of the time for the non-parallelized example).  

Strangely, if the same replacement is made in the code cell but we call the function `random_square` rather than `random_squareT`, we get a different error (`cannot pickle 'module' object`) that indicates that the outputs of `pool.map()` cannot be extracted. 

An alternative and commonly used approach to structuring the code is to put all of it (besides the package imports) in the function `main()` and then call it using 
```python
if __name__ == '__main__':
    main()
```
This approach is shown below.  When run on the author's laptop, it also gives a `BrokenProcessPool` error.  However, when put in a Python file and run from an IDE like Spyder, it works fine (but takes about twice as long as `ThreadPoolExecutor` on this quasi-I/O-bound task).  When run on the Linux machine, it works and finishes in a time slightly longer than for the `ThreadPoolExecutor` approach of the code cell above.

In [None]:
import time
from concurrent.futures import ProcessPoolExecutor


def main(): 
    t0 = time.time()
    
    pool = ProcessPoolExecutor(max_workers=15)                         # create pool of processes
    results = list(pool.map(random_squareT, range(100), timeout=1))    # distribute tasks

    print("First five and last five results:", results[0:5], "...", results[-5:])

    t1 = time.time()
    
    print(f"Execution time = {t1-t0:.2f} sec.")


if __name__ == '__main__':
    main()

> **Conclusion:** Apparently, there's a problem running `ProcessPoolExecutor` in a notebook using Anaconda!

But here's another way to use the `ProcessPoolExecutor` that avoids the `BrokenProcessPool` problem.  We'll use it with the original `random_square` function, since we know multiprocessing is better for CPU-bound tasks than for I/O-bound ones.    

To use this approach, the function `random_square` was defined (exactly as it was earlier in this notebook) within a Python script file and saved in the same directory as this notebook.  That file can be created using any file editor.  If a program like Word or Notebook is used, the file should be saved as a text file, but with a `.py` extension.  That file then needs to be `import`ed in the code cell (without the `.py` at the end of the name), and the function defined in the file is called using the format `filename.function()`.  In the example below, the Python script file is called `randomSquare.py` and the function defined inside it is  `random_square`.

In [2]:
import time
from concurrent.futures import ProcessPoolExecutor
import randomSquare 


def main(): 
    t0 = time.time()
    
    vals = range(10000)                               # arguments to random_square
    with ProcessPoolExecutor() as executor:
        results = executor.map(randomSquare.random_square, vals)   # distribute function to cores
           
    t1 = time.time()
    
    print(f"Execution time = {t1-t0:.2f} sec.")
    

if __name__ == '__main__':
    main()

Execution time = 5.10 sec.


This is quite slow: it takes roughly half as long as the non-parallelized approach to do 1/100 the number of computations; i.e., it's about 50 times slower.  It's also not clear how to print out the results of the computation: multiple approaches all failed (with different errors) for the author.  

Running the same code but with `ThreadPoolExecutor` is much (about 8x) faster but, again, there doesn't seem to be a way to get the  output:

In [5]:
import time
from concurrent.futures import ThreadPoolExecutor
import randomSquare


def main(): 
    t0 = time.time()
    
    vals = range(10000)                               # arguments to random_square
    with ThreadPoolExecutor() as executor:
        results = executor.map(randomSquare.random_square, vals)   # distribute function to cores
     
    t1 = time.time()
    
    print(f"Execution time = {t1-t0:.2f} sec.")


if __name__ == '__main__':
    main()

Execution time = 0.36 sec.


### <font color="blue">15.2.2 Checking if a Number is Prime</font>  
Trying the approach of putting the code into `main()` on the prime-finding task (using `pool.map()`, as in three code cells earlier) again gives a `BrokenProcessPool` error in the notebook, but it runs in Spyder with a time of 5.6 sec on the author's laptop.

In [None]:
import time
from concurrent.futures import ProcessPoolExecutor
import multiprocess as mp


def main(): 
    t0 = time.time()
    
    num = 489133282872437279                                # number to check for primeness

    num_cores = mp.cpu_count()                              # number of cores 

    limits = subint_limits(num, num_cores)                  # get limits of subintervals

    pool = ProcessPoolExecutor(max_workers=15)
    results = list(pool.map(is_primePT, limits))            # distribute function to cores
    
    # Check results: if all subloops report True, then "num" is prime, otherwise it's not
    if all(results):
        print("Number is prime.")
    else:
        print("Number is not prime.")
    
    t1 = time.time()
    
    print(f"Execution time = {t1-t0:.2f} sec.")
    

if __name__ == '__main__':
    main()

Here we try the approach of defining the function that's to be run in parallel (`is_primeP`) in a Python script that's imported by the notebook:

In [9]:
import time
from concurrent.futures import ProcessPoolExecutor
import isPrimeP


def main(): 
    
    t0 = time.time()
    
    num = 489133282872437279                        # number to check for primeness
    num_cores = 16

    limits = subint_limits(num, num_cores)
    
    with ProcessPoolExecutor() as executor:
        results = executor.map(isPrimeP.is_primeP, limits)   # distribute function to cores

        
    # This does not work: gives "cannot pickle 'module' object" error
    '''
    # Check results: if all subloops report True, then "num" is prime, otherwise it's not
    if all(results):
        print("Number is prime.")
    else:
        print("Number is not prime.")
    '''
    
    t1 = time.time()
        
    print(f"Execution time = {t1-t0:.2f} sec.")
    

if __name__ == '__main__':
    main()

Execution time = 0.33 sec.


This appears to complete without an error in a short time, but, like before, there is no clear way to get the outputs (`results`) into a form that can be checked for `True`/`False` values, so we don't know if the code actually processed the subintervals properly.

So, it seems that `concurrent.futures` might not be the best approach for multi*processing* in a notebook, at least not for these types of tasks.  However, it is very effective for multi*threading*.

## <font color="blue">15.3 `joblib`</font>

**`joblib`** (https://joblib.readthedocs.io/en/latest/index.html#) is quick and easy to use for straightforward parallelization tasks, but it does not appear to be very flexible.  

### <font color="blue">15.3.1 Random Number Squaring</font> 
To perform the example of squaring random numbers with `joblib` using all the cores, we use the code below (making sure first to have run the early cell that determined `numCores` and the cell that defines `random_square`):

In [15]:
from joblib import Parallel, delayed
import time

t0 = time.time()

results = Parallel(n_jobs=num_cores, verbose=3)(delayed(random_square)(i) for i in range(1000000))

print("First five and last five results:", results[0:5], "...", results[-5:])

t1 = time.time()

print(f"Execution time = {t1-t0:.2f} sec.")


[Parallel(n_jobs=16)]: Using backend LokyBackend with 16 concurrent workers.
[Parallel(n_jobs=16)]: Done 112 tasks      | elapsed:    2.0s
[Parallel(n_jobs=16)]: Done 3088 tasks      | elapsed:    2.1s
[Parallel(n_jobs=16)]: Done 327696 tasks      | elapsed:    3.7s


First five and last five results: [25, 25, 64, 64, 49] ... [81, 36, 1, 1, 1]
Execution time = 5.93 sec.


[Parallel(n_jobs=16)]: Done 996496 tasks      | elapsed:    5.8s
[Parallel(n_jobs=16)]: Done 1000000 out of 1000000 | elapsed:    5.8s finished


Interesting: at best, this seems to be about 30% faster than the non-parallelized approach, but roughly 60% slower than the approach using `multiprocess.`  That suggests that `joblib`'s overhead is larger than `multiprocess`'s.

The status information in the shaded background is generated based on the `verbose` setting.  Omitting it produces no such output; the larger the number given, the more updates are shown during the computation.

Note that, by default, `joblib` performs process-based parallelization.  To force it to use thread-based parallelization, add `prefer="threads"` as another keyword argument where `n_jobs=num_cores` is specified.  (When tried in the example above, the code ran almost 8 times slower than it did without `prefer="threads"`.  That makes sense in connection with the discussion near the end of the Introduction: since this example involves CPU-intensive calculation but little data I/O, multi-threading won't help due to the GIL.)

### <font color="blue">15.3.2 Checking if a Number is Prime</font>  

Here is code to check if a number is prime using `joblib`:

In [18]:
from joblib import Parallel, delayed
import time


t0 = time.time()

num = 489133282872437279                                      # number to check for primeness

limits = subint_limits(num, num_cores) 

results = Parallel(n_jobs=num_cores, verbose=7)(delayed(is_primeP)(*limits[i]) for i in range(num_cores))


# Check results: if all subloops report True, then "num" is prime, otherwise it's not
if all(results):
    print("Number is prime.")
else:
    print("Number is not prime.")

t1 = time.time()                                              # time at end of process

print(f"Execution time = {t1-t0:.2f} sec.")

[Parallel(n_jobs=16)]: Using backend LokyBackend with 16 concurrent workers.


Number is prime.
Execution time = 3.73 sec.


[Parallel(n_jobs=16)]: Done   3 out of  16 | elapsed:    3.5s remaining:   15.4s
[Parallel(n_jobs=16)]: Done   6 out of  16 | elapsed:    3.5s remaining:    5.9s
[Parallel(n_jobs=16)]: Done   9 out of  16 | elapsed:    3.5s remaining:    2.7s
[Parallel(n_jobs=16)]: Done  12 out of  16 | elapsed:    3.6s remaining:    1.1s
[Parallel(n_jobs=16)]: Done  16 out of  16 | elapsed:    3.6s finished


In this code, the part `(*range_limits[i])` feeds the `i`th set of values in the `range_limits` tuple to `is_primeP()`.  (Recall that `range_limits` as defined by `subint_limits` consists of `num_cores` "subtuples" of three elements each.)  The `*` preceding `range_limits` "unpacks" the elements of the `i`th tuple into the separate values (`n`, `a`, `b`) that `is_primeP()` takes as arguments.

The time to execute the computation is comparable to that using `multiprocess`.

## <font color="blue">15.4 `ipyparallel`</font>

Another package for performing parallel processing is IPython Parallel, or **`ipyparallel`** (https://ipyparallel.readthedocs.io/en/latest/), designed specifically for Jupyter notebooks.  Note that `ipyparallel` documentation and some function outputs use the word "engine" to mean a process (more or less).  

To get started with `ipyparallel`, you'll want to make sure you have a recent enough version installed (7.0 or later).  To display the version number, use this code:

In [1]:
import ipyparallel as ipp

ipp.__version__

'8.4.1'

Now we start a "cluster" of engines.  The "hub" that distributes jobs to the engines can be assigned one core of the computer, so we'll start `numCores-1` engines for computation:

In [5]:
rc = ipp.Cluster(n=num_cores-1).start_and_connect_sync()

Starting 15 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>


  0%|          | 0/15 [00:00<?, ?engine/s]

Each cluster has a unique Cluster ID, which we will need later.  To see the running clusters with their IDs and numbers of engines, use the operating system (shell) command (starting with the "bang" character, `!`) below:

In [6]:
!ipcluster list

PROFILE          CLUSTER ID                       RUNNING ENGINES LAUNCHER
default          1682719542-qij4                  True         15 Local


2023-04-28 18:06:06.558 [IPClusterList] ERROR | Engine 13 not running: Process 2684
2023-04-28 18:06:06.558 [IPClusterList] ERROR | Engine set default:1680293368-46t41680293369 not running: No engines left


If you're working in a Jupyter notebook (but not in JupyterLab), you also should be able to see the clusters in the `IPython Clusters` tab at the top of your Jupyter home page. 

### <font color="blue">15.4.1 Random Number Squaring</font>

Below is `ipyparallel` code that will run our `random_square` function in parallel.  It uses the `load_balanced_view()` method, which is meant for situations in which the tasks could take significantly different amounts of time, or in which we want each core to do roughly the same amount of work. 

In [7]:
import time

t0 = time.time()
                                  
lbv = rc[:]                                                   # use all engines

results = lbv.map_sync(random_square, range(1000000))         # run `random_square` on the engines

print("First five and last five results:", results[0:5], "...", results[-5:]) 

t1 = time.time()

print(f"Execution time = {t1-t0:.2f} sec.")

First five and last five results: [25, 25, 64, 64, 49] ... [81, 36, 1, 1, 1]
Execution time = 1.25 sec.


This is very good: at best it takes about 15% as long as the non-parallelized approach, and is shorter than any other time except for that using `concurrent.futures`.  (On the author's laptop the fastest run took 0.79 sec.)

The alternative to the "load balanced view" above is the "direct view".  It is implemented by defining a "client," which connects to the cluster of engines.  Once the client (labeled `dv` below) is created, we use the same `.map_sync()` method used above to run the function of interest on the cluster.  This requires supplying the `cluster_id`, which is why we displayed it above (just copy and paste it below).

In [9]:
import time

t0 = time.time()

client = ipp.Client(cluster_id='1682719542-qij4')                # create a "client" using active cluster

dv = client[:]                                                   # use all engines

results = dv.map_sync(random_square, range(1000000))             # distribute tasks to the pool of engines

print("First five and last five results:", results[0:5], "...", results[-5:])

t1 = time.time()

print(f"Execution time = {t1-t0:.2f} sec.")

First five and last five results: [25, 25, 64, 64, 49] ... [81, 36, 1, 1, 1]
Execution time = 1.31 sec.


For the author this took about the same time as for the load-balanced approach.  

### <font color="blue">15.4.2 Checking if a Number is Prime</font>

Now let's try finding primes with ipyparallel.  We'll use the direct view approach and the `.map()` function again.  We'll have to make a small modification to the `is_primeP()` function, though, so that it can deal with a tuple as the argument and correctly break out its elements.  (As a reminder, the tuples consist of the number whose primeness we're checking, `num`, and the boundaries of the subinterval,`a`, and `b`.)  That modified function (with a `T` appended to the name) is below.  (The new code is just the top section where `n`, `a`, and `b` are pulled out as the elements of the tuple provided as the argument.)

In [10]:
def is_primePT(tup):
    '''Checks if 'n' has prime factors between 'a' and 'b'.
       Returns "False" if prime factor found, "True" otherwise.'''
               
    n = tup[0]
    a = tup[1]
    b = tup[2]
    
    # Restrict arguments to positive integers
    if (not isinstance(n, int)) or (n < 1):
        raise ValueError("Input should be a positive integer.\n") 
        
    if (not isinstance(a, int)) or (a < 0) or (not isinstance(b, int)) or (b < 0) or (b < a):
        raise ValueError("Limits should positive integers, with b > a.\n")

        
    # Check special cases
    if n == 1:
        print("1 is not prime.")       # n = 1 has only one factor
        return False       
    if n == 2:
        print("2 is prime.")
        return True
    if n % 2 == 0:
        print("Not prime: 2 is a factor.")
        return False
    
    
    # Adjust a & b if not odd
    if a % 2 == 0:
        a = a - 1
    if b % 2 == 0:
        b = b + 1
        
        
    # Check for factors, for odd numbers from a to b 
    for x in range(a, b+1, 2):
        if n % x == 0:
            return False
    return True


And here's the code to distribute the computation:

In [16]:
import time
from numpy import sqrt, zeros

t0 = time.time()

# Create a "client" using active cluster. USER MUST SUPPLY CLUSTER ID from 
# "!ipcluster list" run in code cell, or IPython Clusters notebook tab.
client = ipp.Client(cluster_id='1682719542-qij4')                

dv = client[:]                                                   # use all engines

num = 489133282872437279                                         # number to check for primeness

limits = subint_limits(num, num_cores-1)                        # get tuple of subinterval limits

results = dv.map(is_primePT, [limits[i] for i in range(num_cores-1)])

# Check results: if all subintervals report True, then "num" is prime; otherwise, it's not
if all(results.result()):
    print("Number is prime.")
else:
    print("Number is not prime.")
    
t1 = time.time()

print(f"Execution time = {t1-t0:.2f} sec.")

Number is prime.
Execution time = 3.68 sec.


On the author's computer, this is about 10% faster than the approach using `multiprocess`.  

As an aside, we can compare this to the time for one subinterval to run on one engine.  To call a single engine, use `client[i].apply()` for some index `i`:

In [28]:
t0 = time.time()

results = client[0].apply(is_primePT, limits[0])

# Check results: if all subintervals report True, then "num" is prime; otherwise, it's not
if results.result():
    print("Number is prime.")
else:
    print("Number is not prime.")
    
t1 = time.time()

print(f"Execution time = {t1-t0:.2f} sec.")

Number is prime.
Execution time = 1.83 sec.


On the author's computer, this took about half the time that it took to run all the subintervals on all the cores.  If every core takes about the same time, and they run simultaneously, that suggests that the overhead involved in using `.map()` accounts for about half the run time in the calculation involving all the subintervals.    

To shut down the engines at the end of a session, click the Stop button for the cluster on the `IPython Cluster` tab on the jupyter home page.  (If that doesn't work, you should be able to run `rc.shutdown(hub=True)` from a code cell in a notebook.  If for some reason you don't want to shut down the hub, don't include `hub=True`.)

## <font color="blue">15.5 `dask`</font>

As noted above, the `dask` package, which is included by default in the Anaconda distribution (and based on `concurrent.futures`), was created to work well with `numpy`, `pandas`, and the machine learning packages `scikit-learn`.  It's especially designed to work well with `numpy` arrays and `pandas` "dataframes" (similar to database tables).  However, the `dask` "scheduler" that distributes tasks to cores takes 1 millisecond per task to do its work, so it will not speed up all computations, and may slow down those in which the basic task is very quick.

One way to use `dask`, called "dask delayed," can, in principle, parallelize any code that's parallelizable.  It uses two key tools to run code in parallel automatically: the **`@dask.delayed`** "decorator," and the **`.compute()`** method.  The decorator tells Python not to actually run the following code, but just to "set it up to run;" the method then tells it to do the computation (in parallel).

To use these with our `random_square` function, we first need to create a cluster of cores and the "scheduler" that will distribute the tasks to the cores (below, 15 cores, or "workers" are created).  As in previous parallel processing packages, we should ensure that we're in the `main` process, to avoid a recursive sequence of cluster creations:

In [1]:
if __name__ == "__main__": 

    from dask.distributed import Client
    client = Client(n_workers=15)

You can get a "Dashboard" display of information about the cluster by executing the command `client` and then clicking on the URL link that appears.  That will open a new browser tab with several real-time plot windows showing memory usage, CPU activity, and other properties.  Other displays can be shown using the "tabs" near the top of the Dashboard (e.g., "Status," "Workers," "Tasks," etc.).  "Status" is the main, default tab.

In [2]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 15
Total threads: 30,Total memory: 15.35 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:61945,Workers: 15
Dashboard: http://127.0.0.1:8787/status,Total threads: 30
Started: Just now,Total memory: 15.35 GiB

0,1
Comm: tcp://127.0.0.1:62031,Total threads: 2
Dashboard: http://127.0.0.1:62038/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61948,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-oh_ljcht,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-oh_ljcht

0,1
Comm: tcp://127.0.0.1:62018,Total threads: 2
Dashboard: http://127.0.0.1:62025/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61949,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-aq6y79f3,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-aq6y79f3

0,1
Comm: tcp://127.0.0.1:62048,Total threads: 2
Dashboard: http://127.0.0.1:62051/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61950,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-oo5j10s_,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-oo5j10s_

0,1
Comm: tcp://127.0.0.1:62041,Total threads: 2
Dashboard: http://127.0.0.1:62046/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61951,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-lz2iahcr,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-lz2iahcr

0,1
Comm: tcp://127.0.0.1:62024,Total threads: 2
Dashboard: http://127.0.0.1:62035/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61952,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-y74fr9cg,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-y74fr9cg

0,1
Comm: tcp://127.0.0.1:62045,Total threads: 2
Dashboard: http://127.0.0.1:62049/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61953,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-jkfpg8jc,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-jkfpg8jc

0,1
Comm: tcp://127.0.0.1:62023,Total threads: 2
Dashboard: http://127.0.0.1:62033/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61954,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-lbmeo7hj,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-lbmeo7hj

0,1
Comm: tcp://127.0.0.1:62022,Total threads: 2
Dashboard: http://127.0.0.1:62029/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61955,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-b4jdwg51,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-b4jdwg51

0,1
Comm: tcp://127.0.0.1:62017,Total threads: 2
Dashboard: http://127.0.0.1:62020/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61956,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-wqer4_yi,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-wqer4_yi

0,1
Comm: tcp://127.0.0.1:62037,Total threads: 2
Dashboard: http://127.0.0.1:62043/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61957,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-zg74gjft,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-zg74gjft

0,1
Comm: tcp://127.0.0.1:62019,Total threads: 2
Dashboard: http://127.0.0.1:62026/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61958,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-o_kh7qbz,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-o_kh7qbz

0,1
Comm: tcp://127.0.0.1:62032,Total threads: 2
Dashboard: http://127.0.0.1:62040/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61959,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-98n3nent,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-98n3nent

0,1
Comm: tcp://127.0.0.1:62011,Total threads: 2
Dashboard: http://127.0.0.1:62013/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61960,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-fijqadm_,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-fijqadm_

0,1
Comm: tcp://127.0.0.1:61990,Total threads: 2
Dashboard: http://127.0.0.1:62009/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61961,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-7av8odpa,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-7av8odpa

0,1
Comm: tcp://127.0.0.1:62012,Total threads: 2
Dashboard: http://127.0.0.1:62015/status,Memory: 1.02 GiB
Nanny: tcp://127.0.0.1:61962,
Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-ftjfntp0,Local directory: C:\Users\Me\AppData\Local\Temp\dask-worker-space\worker-ftjfntp0


### <font color="blue">15.5.1 Random Number Squaring</font>

To parallelize the code, import `dask`, include the `@dask.delayed` decorator before the definition of any function that's to be run in parallel, and use the `.compute()` method to get the results:

In [3]:
import dask
import time

@dask.delayed
def random_square(myseed):
    from numpy.random import seed, randint  # Must be imported inside function to be available to child processes
    
    seed(myseed)                            # set the seed to be the argument to the function
    random_num = randint(0, 10)             # return a random number between 0 and 10 based on the seed
    
    return random_num**2


t0 = time.time()                           # start time

out = []
for i in range(10000):                     # loop to define work to be performed
    res = random_square(i)                 # because random_square is "delayed," it won't run when this loop executes
    out.append(res)                        # store to-be-computed value

results = dask.compute(*out)               # *this* line will run random_square on the arguments given in the for loop

print("First five and last five results:", results[0:5], "...", results[-5:])

t1 = time.time()                           # end time

print(f"Execution time = {t1-t0:.2f} sec.")

First five and last five results: (25, 25, 64, 64, 49) ... (9, 4, 9, 81, 36)
Execution time = 13.00 sec.


Here, the `random_square` function is defined and a `for` loop is set up to provide its arguments, but the calls to the function in the loop are not actually performed; they're merely defined.  (Hence the "`delayed`" in `@dask.delayed`.)  The computation gets done later by `dask.compute(*out)`, which runs on the yet-to-be-evaluated quantities defined by the loop.

Unfortunately, this approach is quite slow: on the author's laptop it takes about three times longer to process 10,000 inputs than the unparallelized approach takes to operate on 1,000,000 inputs.  Most likely, that's because of the schedule overhead noted above.  (The last five outputs differ from those in the earlier calculations because the limit of the `range` function here is 10,000, not 1,000,000, so the last five `i` values are different.)

An easier approach to parallelization is to use `dask`'s **`.map()`** method, similar to what we've seen with other packages.  Here's that approach, again with 10,000 values supplied as arguments.  (Watch the "Task Stream" panel on the Dashboard as the code cell below runs and you'll see when each of the workers/cores, corresponding to rows in the display, are active.)

In [4]:
import dask
import time


def random_square(myseed):
    from numpy.random import seed, randint  # Must be imported inside function to be available to child processes
    
    seed(myseed)                            # set the seed to be the argument to the function
    random_num = randint(0, 10)             # return a random number between 0 and 10 based on the seed
    
    return random_num**2


t0 = time.time()                          # start time

out = client.map(random_square, range(10000))

results = client.gather(out)

print("First five and last five results:", results[0:5], "...", results[-5:]) 

t1 = time.time()                          # end time

print(f"Execution time = {t1-t0:.2f} sec.")

First five and last five results: [25, 25, 64, 64, 49] ... [9, 4, 9, 81, 36]
Execution time = 7.28 sec.


This is better, but still not great: it takes nearly fifty times as long per computed value as the non-parallelized approach.  Presumably, it works better on much larger data sets, like those for which `pandas` often is employed, where the overhead is a much tinier fraction of the total workload involved.

One more option is to use **`dask bag`**, which can parallelize simple computations.  The code below shows how to use it.

In [5]:
import dask.bag as db

t0 = time.time()                          # start time

b = db.from_sequence(range(10000), npartitions=15)    # specify iterables and how to group them

results = b.map(random_square).compute()

print("First five and last five results:", results[0:5], "...", results[-5:]) 

t1 = time.time()                          # end time

print(f"Execution time = {t1-t0:.2f} sec.")

First five and last five results: [25, 25, 64, 64, 49] ... [9, 4, 9, 81, 36]
Execution time = 0.67 sec.


Here, the line `b = db.from_sequence(range(N), npartitions=15)` specifies that the set of iterables to feed to our function is `range(N)`, but those values are to be grouped into 15 sets (to match the number of workers; this doesn't have to be the case).

This approach is faster than either of the other two `dask` approaches, but still slower per value (by a factor of roughly 10) than the non-parallelized approach.

### <font color="blue">15.5.2 Checking if a Number is Prime</font>

Here's code to use `dask` to check our prime number (note that it uses `is_primePT` rather than `is_primeP`, so that we can feed it entire tuples at once):

In [11]:
import dask.bag as db
import multiprocess as mp

t0 = time.time()                                                      # start time

num = 489133282872437279                                              # number to check for primeness

num_cores = mp.cpu_count()

limits = subint_limits(num, num_cores)

b = db.from_sequence(limits, npartitions=num_cores)   # specify iterables and how to group them

results = b.map(is_primePT).compute()

# Check results: if all subintervals report True, then "num" is prime; otherwise, it's not
if all(results):
    print("Number is prime.")
else:
    print("Number is not prime.")
    
t1 = time.time()

print(f"Execution time = {t1-t0:.2f} sec.")

Number is prime.
Execution time = 5.40 sec.


This is a little slower than the non-parallelized approaches.

To finish up, we have to close the cluster we created:

In [12]:
client.close()

### Summary of Results ###

The following table summarizes the times taken on the author's computer to solve the tasks of random number squaring and checking if a number is prime using the packages introduced above.  For random number squaring, the values are seconds per $10^6$ values squared; for checking primeness, the values are seconds.  (Times are for specific runs.  Run times for a given task can vary somewhat.)

|       Package      | Number Squaring | Checking Primeness | 
|:-------------------|:---------------:|:------------------:|
|  non-parallelized  |       5.4       |         28         |
|   `multiprocess`   |       2.4       |         4.1        |
|`concurrent.futures`|     312/43*     |         5.6**      |
|      `joblib`      |       5.9       |         3.7        |
|   `ipyparallel`    |       1.3       |         1.8        |
|       `dask`       |       67        |         5.4        |

    
<center><font size=2>*Using ProcessPoolExecutor/ThreadPoolExecutor</font></center> 
<center><font size=2>**Run in Spyder</font></center> 


Based on this limited set of tests, it seems that `ipyparallel` gives the greatest speed-up of computations.  Of course, that conclusion might not hold for other tasks, but it would make sense to try `ipyparallel` before trying the other packages.

## <font color="blue">15.6 `cuda`</font>

**`cuda`** is a package used to implement multiprocessing on GPUs.  However, only certain GPUs are compatible with it.  If you have a standard personal computer with an "integrated" graphics processor (rather than a "dedicated" one), most likely it cannot run `cuda`.  A gaming-focused computer might be able to run it, though.

As noted in Module 0, Python is an interpreted rather than compiled computing language.  In practice, that means that Python code generally isn't as fast as code written in compiled languages like C.  **`numba`** is a ***compiler*** for Python that essentially makes it work like a compiled language.

Using CUDA together with with NUMBA, Python programmers can create code to run efficiently on GPUs.  These packages will not be discussed in this module, but further information can be found here https://developer.nvidia.com/how-to-cuda-python, and notebooks containing simple examples are available here: https://github.com/ContinuumIO/gtc2017-numba. 

### <font color="blue">Recap</font>

* A multithreading approach will work better for code that is I/O-bound, while multiprocessing will work better for CPU-bound tasks.  Scientific modeling is very likely to fall into the latter category (except possibly when reading from or writing to a large data file).  
<br>

* Code written in a non-parallelized approach very likely would need to be modified to work in a parallelized approach.  
<br>

* There are many approaches to, and packages for, parallelization in Python, only some of which have been presented here.  Of those, `joblib` seems to be the easiest to use, but the `ipyparallel` package produced the fastest performance for multi*processing*, at least for the two example applications used here across all the packages.  However, that conclusion might not hold for other applications, which is why it's good to know about the other packages.  For multi*threading*, the functions provided in the `concurrent.futures` package gave very good results, but the multi*processing* functions in that package led to problems in the notebook.  
<br>

* All of the packages introduced above have significantly more capabilities than have been presented in this module.  For example, in addition to `map` or `pool` functions that automatically distribute tasks among cores, at least some of the packages allow the user to send specific tasks to specific processors.  Read their documentation for more details.  
<br>

* This module has not discussed message passing or queues at any length, but those are necessary in some applications in order to share data between processes.  MPI is the parallelization approach to use when the number of cores involved exceeds roughly 1000.  

### <font color="blue">Reflection Prompts</font>

These questions are intended to help you think about what you learned from this module and how it might be useful to you in the future. You are strongly encouraged to answer them before moving on to the next module.  

- Which components of this module did you find you were easily able to work through, and why do you think they were especially easy for you?

- Which components of this module did you find more difficult to work through, and why do you think they were challenging?

- When you got stuck, what did you do to get unstuck? Could this or similar actions be helpful if you get stuck in future work?

- What do you understand more deeply about this material?

- What questions or uncertainties remain for you regarding this material?

### <font color="blue">Exercises</font>

<u>**Exercise #1**</u>  
In Module 3, Mastery Exercise #3 asked you to simulate the elastic bouncing of a ball inside a square 2-D box in the absence of gravity and drag.  In this Exercise, we'll model a collisionless, thermal gas in a square 2-D box by modeling the motion of a large number of (identical) particles (which could be atoms or molecules).  Since the gas is collisionless, the particles do not interact with each other and their positions evolve independently; as a result, their trajectories can be computed using a parallelized approach.

**(a)** Write a Python function to model such a gas (without gravity) of $N$ particles.  (Make sure to deal with the elastic collisions at the walls, and let the dimensions of the box be a variable that can be changed, but set it to 1 by default.)  Assume the particles' initial positions inside the box are random, as are the directions of their velocities, and that their speeds are governed by the 2-D Maxwell-Boltzmann (M-B) distribution, such that the probability (density) of a particle having speed in the range $v \rightarrow v + dv$ is given by

$$P(v) = \left( \frac{m}{kT} \right) v \, e^{-mv^2/2kT},$$

where $m$ is the mass of each particle, $k = 1.381 \times 10^{-23} \, \textrm{J/K}$ is Boltzmann's constant, and $T$ is the temperature in Kelvin.  $m$, $T$ should be adjustable parameters in your function (but they need not be arguments).  

**Notes:** 
- The M-B distribution is available in the `scipy.stats` package as a special case of the `chi` distribution, where the parameter `2` must be provided.  That is, you can get the M-B probability distribution with a function call of the form `out = chi.rsv(2, size=N)`, where `N` is the number of output values you want the function to return.  (The function call `rayleigh(size=N)` will do the same thing.)  Each return value will be drawn from the M-B distribution such that the higher the value of the M-B function for a given `x`, the more likely it is that values near `x` will be returned.  (As a quick "warm-up project," plot a histogram of this distribution.  Do that for a large number of points, 100000 or more, and maybe 50 histogram bins, so that you get a good representation of the distribution.  Compare it with a plot of the M-B distribution, which you can find on Wikipedia, for example.)  Keep in mind that `chi` reproduces the simple distribution function $f(x) = x^2 e^{-x^2/2}$, which is not quite what we need, so you have to relate $v$ in $P(v)$ to $x$ in $f(x)$ and use their relation to scale the output of `chi` to give speeds that depend properly on $m$, $k$, and $T$.  
<br>

- For atom/molecule-size particles, the speeds will be on the order of a few hundred meters per second, so you will want to use an appropriately small time step.

Record the position of each particle for a time period that's a couple orders of magnitude larger than your timestep, with $T = 300$ K and $m$ the mass of a nitrogen molecule (N$_2$).  

**(b)** Parallelize your code, compute the trajectories of 100 particles throughout the full time span (confirming via your computer's Resource or Activity Monitor that nultiple cores are running), and plot all the trajectories inside the box.  

<font size="3pt">**Breakpoint Answers**</font>

**Breakpoint 1**  
The non-parallelized code should find the prime factor faster, since the parallelized code has to do extra computations (to create the iterable) and also has to distribute the subintervals to the worker processes and wait for all the results to come back.