# Parallel Programming with Python

Python has the ability to run in parallel, using both shared memory and distributed memory methods.  This tutorial is meant to give you a brief introduction to what's available and, more importantly, when it's appropriate to use.

## Python Threading

Generally, Python threading is terrible.  But it shouldn't be:

* POSIX threads
* Shared memory with parent process
* Lightweight threads

### Global Interpreter Lock (GIL)

In order to keep memory coherent, the Python intrepter only allows a single thread to run at once....killing performance for any kind of shared memory workload.

There are (some) good reasons for this (I/O, intrepreter maintenance, etc.)

### Example: Calculate Pi with Python Threads

Simple process:
* Inscribe a circle in a square
* Throw darts at it
* Count how many are inside the circle and how many are outside
* Use the ratio of those to compute pi

<img src="../img/circle_and_square.png" style="height:350px">

In [1]:
from threading import Thread, Lock
import random

lock = Lock() # lock for making operations atomic

def calcInside(nsamples,rank):
    global inside # we need something everyone can share random.seed(rank)
    random.seed(rank)
    for i in range(nsamples):
        x = random.random()
        y = random.random()
        if (x*x)+(y*y)<1:
            lock.acquire() # GIL doesn't always save you
            inside += 1
            lock.release()

if __name__ == '__main__':
    nt=4 # thread count
    inside = 0 # initialise
    samples=int(10e5/nt)
    threads=[Thread(target=calcInside, args=(samples,i)) for i in range(nt)]
    
    for t in threads: t.start()
    for t in threads: t.join()
    
    print((4.0*inside)/(1.0*samples*nt))

3.138488


A few notes on the above code:

* Python data-structures (e.g. lists, dictionaries, etc.) are thread-safe, meaning that when updating their values the GIL is held and other threads wait until it's released before updating object  
<br>
* Simpler data structures like integegers aren't thread-safe, which means we could have mulitple threads accessing the same object and either data could be corrupted or missed  
<br>
* We have to explicitly lock our `inside` counter in the above code to avoid this

## Subprocess

Python's `subprocess` module allows the Python intrepter to spawn and control processes that aren't affected by the GIL.  The basic command in the `subprocess` module is `Popen()`, which lets you open a proces:

In [2]:
import subprocess
pi=subprocess.Popen('python -c "import math; print(math.pi)"',shell=True,stdout=subprocess.PIPE)

In [3]:
pi.stdout.read()

b'3.141592653589793\n'

In [4]:
pi.pid

40

Some issues with subprocess:
* Shared memory is tricky at best
* Locks and atomics are difficult

It's really designed for launching independent processes:

In [4]:
cmds = [
    'echo foo',
    'echo bar',
    'date',
    'hostname'
]

tasks = [subprocess.Popen(c, shell=True) for c in cmds]
for t in tasks: t.wait()

You can see the output of those subprocesses with `docker logs jupyter`

```console
foo
bar
Wed Apr 10 05:48:08 UTC 2019
023c8899762c
```

Let's look at another method...

## Multiprocessing

This module blends together Python threads and subprocesses.  It bypasses the GIL, so threads can be used and see some performance.  Under the hood it uses subprocesses, but has a manager to handle things like synchronization and distribuited sharing (but still not true shared memory).

### Calculating pi with Multiprocessing

We will use the `Pool` module to calculate pi.  `Pool` allows you to define a group of worker processes that you will then divide some work amongst.  `Pool` takes two inputs:

* A function that we want to run across the pool of workers
* An iterable...some way to identify how we're splitting up work

In [6]:
import multiprocessing as mp
import numpy as np
import random

processes = mp.cpu_count()
nsamples = int(10e5/processes)

def calcInside(rank):
    inside = 0
    random.seed(rank)
    for i in range(nsamples):
        x = random.random();
        y = random.random();
        if (x*x)+(y*y)<1:
            inside += 1
    return (4.0*inside)/nsamples

# Important to check if main so child processes don't try to run it
if __name__ == '__main__':
    pool = mp.Pool(processes)
    result = pool.map(calcInside, range(processes))
    print(np.mean(result))

3.14226


The `Multiprocessing` module has support for other parallel constructs like process communication and locks.  We won't go into them today, but you should be aware of them.

While Multiprocessing is certainly an improvement over `subprocess` and Python threads, it does come with overhead that impacts performance.  Additionally, it will only scale to a single node (no distributed memory capability).

In order to do that, we need...

## mpi4py

`mpi4py` is a set of bindings to make use of MPI, Message Passing Interface.  MPI forms the basis of most applications that run on HPC systems today.  We won't cover MPI today, but it is important to understand a few basics to understand what `mpi4py` is doing.

Simply, all MPI allows is for processors to communicate data between each other.   Each process executes the same instructions (or code), but on different parts of the data.  At points throughout the computation, they may need to send or receive data to/from memory locations that are non-local.  MPI is the API that allows for this.

### Hello World

```python
from mpi4py import MPI
import sys

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()

sys.stdout.write(
    "Hello, World! I am process %d of %d on %s.\n"
    % (rank, size, name))
```

In [7]:
!mpirun -np 4 python ../code/mpi4py/helloworld.py

Hello, World! I am process 0 of 4 on 023c8899762c.
Hello, World! I am process 2 of 4 on 023c8899762c.
Hello, World! I am process 3 of 4 on 023c8899762c.
Hello, World! I am process 1 of 4 on 023c8899762c.


### Point-to-Point Communication

```python
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1)
elif rank == 1:
    data = comm.recv(source=0)
    print('On process 1, data is ',data)
```

In [8]:
!mpirun -np 4 python ../code/mpi4py/pt2pt.py

On process 1, data is  {'a': 7, 'b': 3.14}


We sent a dictionary, but we can also send NumPy arrays (and we should try to do that all the time):

```python
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    # in real code, this section might
    # read in data parameters from a file
    numData = 10  
    comm.send(numData, dest=1)

    data = np.linspace(0.0,3.14,numData)  
    comm.Send(data, dest=1)

elif rank == 1:

    numData = comm.recv(source=0)
    print('Number of data to receive: ',numData)

    data = np.empty(numData, dtype='d')  # allocate space to receive the array
    comm.Recv(data, source=0)

    print('data received: ',data)
```

In [9]:
!mpirun -np 4 python ../code/mpi4py/pt2pt_numpy.py

Number of data to receive:  10
data received:  [0.         0.34888889 0.69777778 1.04666667 1.39555556 1.74444444
 2.09333333 2.44222222 2.79111111 3.14      ]


### Collectives

Collectives are operations that all processors execute together.  They may execute at slightly different times, but they all will call the same function.  These are useful for operations like gathering data onto a root process, or distributing data from one to all.

Hers' an example of performing a `gather` operation:

<img src="../img/gather.png" style="height:150px">

```python
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()   

numDataPerRank = 10  
sendbuf = np.linspace(rank*numDataPerRank+1,(rank+1)*numDataPerRank,numDataPerRank)
print('Rank: ',rank, ', sendbuf: ',sendbuf)

recvbuf = None
if rank == 0:
    recvbuf = np.empty(numDataPerRank*size, dtype='d')  

comm.Gather(sendbuf, recvbuf, root=0)

if rank == 0:
    print('Rank: ',rank, ', recvbuf received: ',recvbuf)
```

In [10]:
!mpirun -np 4 python ../code/mpi4py/gather.py

Rank:  3 , sendbuf:  [31. 32. 33. 34. 35. 36. 37. 38. 39. 40.]
Rank:  2 , sendbuf:  [21. 22. 23. 24. 25. 26. 27. 28. 29. 30.]
Rank:  1 , sendbuf:  [11. 12. 13. 14. 15. 16. 17. 18. 19. 20.]
Rank:  0 , sendbuf:  [ 1.  2.  3.  4.  5.  6.  7.  8.  9. 10.]
Rank:  0 , recvbuf received:  [ 1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 12. 13. 14. 15. 16. 17. 18.
 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35. 36.
 37. 38. 39. 40.]


mpi4py has the ability to ship *any* serialisable Python object.  That means that objects like `dicts` need to be converted to a byte stream, a process called pickling.  That means a Python object (except for strings and ints) needs to be pickled, sent over MPI, and then repickled...adding significant overhead.

However, arrays in NumPy map to C memory allocations, and mpi4py can send them at *almost* the speed of C/C++/Fortran:

<img src="../img/allgather_bench.png" style="height:450px">
<img src="../img/latency_bench.png" style="height:450px">
<img src="../img/bandwidth_bench.png" style="height:450px">

## Computing Pi

Now let's look at how we can compute pi with mpi4py.

In [11]:
from mpi4py import MPI
import numpy

# Function to calcualte pi that each MPI rank will use
def compute_pi(samples):
    count = 0
    for x, y in samples:
        if x**2 + y**2 <= 1:
            count += 1
    pi = 4*float(count)/len(samples)
    return pi

# Set up our MPI environment
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()

# Processor 0 generates random samples that each processor will use
if myrank == 0:
    N = 100000 // nprocs
    samples = numpy.random.random((nprocs, N, 2))
else:
    samples = None

# Distribute the samples amongst all processors wiht MPI_Scatter
samples = comm.scatter(samples, root=0)

# Each processors calculates their value of pi (we'll take the average)
mypi = compute_pi(samples) / nprocs

# MPI_Reduce collects all individual 
pi = comm.reduce(mypi, op=MPI.SUM, root=0)

if myrank == 0:
    error = abs(pi - numpy.pi)
    print("pi is approximately %.16f, error is %.16f" % (pi, error))

pi is approximately 3.1450000000000000, error is 0.0034073464102069


That runs on a single MPI process, so let's launch it in parallel:

In [12]:
!mpirun -np 4 ../code/mpi4py/pi_mpi.py

pi is approximately 3.1414663999999997, error is 0.0001262535897935


## ipyparallel

ipyparallel is Python package for creating and running clusters in Jupyter (used to be known as IPython.parallel).  It offers a nice, interactive method for developing and running parallel Python applications.

I'd still recommend using a more traditional approach of a standalone Python script and batch script submitted to a scheduler for large-scale production runs, but for rapid development and prototyping, ipyparalell is a valuable tool.

There a 4 parts to the ipyparallel architecture:

<img src="../img/ipyparallel_overview.png" style="height:450px">

**Engine**  
An engine is a Python instance that can accept commands, run code, and return results.  You can run multiple engines, allowing for parallel and distributed computing.

**Schedulers**  
Any commands that are to be run on an engine first go through a scheduler.  The engines will block when executing code, so the scheduler will manage requests in the background.

**Client**  
This ia a Python object that lets you connect to a ipyparallel cluster.  
**Hub**  
The hub is the brain of an ipyparallel cluster.  It manages connections to engines, shcedulers, and clients

We won't go into all the details of ipyparallel (there's a lot), but we will start a small cluster to run our mpi4py pi code.

You can start clusters via command line options:

```python
ipcluster start -n 4
```
but we'll do it via a Jupyter notebook extension.  There is an `Ipython Clusters` tab where you can start a cluster.  Select 4 engines and start the cluster:

<img src="../img/jupyter-cluster.png" style="height:250px">

It will take a few seconds to start up the cluster, but once it's running we can create a client that we'll use to connect to our cluster:

In [14]:
import ipyparallel as ipp
client = ipp.Client()
client.ids

[0, 1, 2, 3]

Here you can see the IDs of the 4 engines we have runninng in our cluster.

ipyparallel has a concept called a **view**, which is way to access the engines availalble.  A **direct** view simply lets you send explicit commands to specific engines (e.g. tell each engine to run the same `compute_pi()` function).  A **load balanced** view is more like the `multiprocessing` module we saw previously; you send a command to a pool of workers, and the scheduler will handle which engine it runs on (depending on which one is available).

We're going to just focus on the direct views in this example.  To start we simply define what engines we want to use in our direct view:

In [15]:
dview = client[:]
dview

<DirectView [0, 1, 2, 3]>

Here's the same pi calculation code we've seen before, but we're going to do in parallel without mpi4py:

In [16]:
from random import random
from math import pi
dview['random'] = random

# Serial version
def serial_pi(nsamples):
    s = 0
    for i in range(nsamples):
        x = random()
        y = random()
        if x*x + y*y <= 1:
            s+=1
    return 4.*s/nsamples

# Parallel version
def parallel_pi(view, nsamples):
    p = len(view.targets)
    if nsamples % p:
        # ensure even divisibility
        nsamples += p - (nsamples%p)
    
    subsamples = nsamples//p
    
    ar = view.apply(serial_pi, subsamples)
    return sum(ar)/p

The code is similar to the `mpi4py` version:
* We define a serial algorithm
* We divide up the number of samples based on the number of engines in our view
* The serial algorithm is applied across all engines, and the final answer is aggreated.

Let's see the serial version performs:

In [17]:
%%timeit
serial_pi(int(1e7))

3.45 s ± 84.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


We'll run the parallel version, passing in the list of engines in our view that we'll use:

In [18]:
%%timeit
parallel_pi(dview, int(1e7))

1.41 s ± 17.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
import ipyparallel as ipp
from mpi4py import MPI
import numpy

# Function to calcualte pi that each MPI rank will use
def compute_pi(samples):
    count = 0
    for x, y in samples:
        if x**2 + y**2 <= 1:
            count += 1
    pi = 4*float(count)/len(samples)
    return pi

client = ipp.Client(profile='mpi')
view = client[:]

print("Client MPI.COMM_WORLD.Get_size()=%s" % MPI.COMM_WORLD.Get_size())
print("Client engine ids %s" % client.ids)

# Set up our MPI environment
comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()

# Processor 0 generates random samples that each processor will use
if myrank == 0:
    N = 100000 // nprocs
    samples = numpy.random.random((nprocs, N, 2))
else:
    samples = None
# Distribute the samples amongst all processors wiht MPI_Scatter
samples = comm.scatter(samples, root=0)
# Each processors calculates their value of pi (we'll take the average)
mypi = compute_pi(samples) / nprocs
# MPI_Reduce collects all individual 
pi = comm.reduce(mypi, op=MPI.SUM, root=0)
if myrank == 0:
    error = abs(pi - numpy.pi)
    print("pi is approximately %.16f, error is %.16f" % (pi, error))

Client MPI.COMM_WORLD.Get_size()=1
Client engine ids [0, 1, 2, 3]
pi is approximately 3.1496400000000002, error is 0.0080473464102071


## Real Example

For those that don't consider Python a vialble HPC language, here's an example of what you can do with Python at scale:

<img src="../img/pyfr_logo.png" style="height:250px">


[PyFR](http://www.pyfr.org/index.php) - A Python framework for solving advection-diffucions problems.

Features:
* **Multi-platform**
    * AMD GPUs
    * NVIDIA GPUs
    * CPUs
    * Intel MIC
    * Even Raspberyy Pi  
<br>
* **Parallelism**
    * MPI (mpi4py)
    * CUDA (PyCUDA)
    * OpenMP (pyMIC for KNL)
    * OpenCL (PyOpenCL)
    * HDF5 Parallel I/O (h5py)  
<br>    
* **Scalable**
    * 18,000 K20X GPUs on Titan (ORNL)
    * 195 billion DOFs
    * 58% peak performance (Summit HPL benchmark was 71%)
    * SC16 Best Paper/Gordon Bell nominee
    
And it does all that in about **8000 lines of code**.

<img src="../img/pyfr-sim.gif" style="height:250px">

### Euler Demo (from PyFR website)

Here we'll run a small PyFR demo, a 2D Euler vortex simulation.  We have a separate Conda environment in our notebook for PyFR (select the pyfr kernel from the Kernel menu:

<img src="../img/conda-nb.png" style="height:600px">

**NOTE:** For this small example, runnning in parallel inside our notebook give pretty terrible performance.  I've written the commands 

In [1]:
!pyfr import ../code/pyfr/euler_vortex_2d.msh ../code/pyfr/euler_vortex_2d.pyfrm

We need to partition the mesh depending on how many processors we want to use:

In [2]:
!pyfr partition 4 ../code/pyfr/euler_vortex_2d.pyfrm ../code/pyfr/



In [3]:
!mpirun -np 4 pyfr run -b openmp -p ../code/pyfr/euler_vortex_2d.pyfrm ../code/pyfr/euler_vortex_2d.ini

[2K[G   0.0% [>                           ] 0.01/100.00 ela: 00:00:02 rem: 14:47:22[2K[G   0.0% [>                           ] 0.01/100.00 ela: 00:00:04 rem: 13:32:03[2K[G   0.0% [>                           ] 0.01/100.00 ela: 00:00:07 rem: 13:09:03[2K[G   0.0% [>                           ] 0.02/100.00 ela: 00:00:09 rem: 13:00:47[2K[G   0.0% [>                           ] 0.03/100.00 ela: 00:00:11 rem: 13:04:37[2K[G   0.0% [>                           ] 0.03/100.00 ela: 00:00:13 rem: 12:49:22^C
[mpiexec@023c8899762c] Sending Ctrl-C to processes as requested
[mpiexec@023c8899762c] Press Ctrl-C again to force abort
Traceback (most recent call last):
  File "/opt/conda/envs/pyfr/bin/pyfr", line 11, in <module>
    load_entry_point('pyfr==1.8.0', 'console_scripts', 'pyfr')()
  File "/opt/conda/envs/pyfr/lib/python3.7/site-packages/pyfr-1.8.0-py3.7.egg/pyfr/__main__.py", line 110, in main
  File "/opt/conda/envs/pyfr/lib/python3.7/site-packages/pyfr-1.8.0-py3.7.egg/pyfr/__main_

PyFR outputs in VTK format, 

In [13]:
import ipyparallel as ipp
c = ipp.Client()
view = c[:]
view.activate()
view.run('/opt/conda/envs/pyfr/bin run -b openmp -p ../code/pyfr/euler_vortex_2d.pyfrm ../code/pyfr/euler_vortex_2d.ini')
view

In [11]:
view = c[:]
view.activate()
view.run('/opt/conda/envs/pyfr/bin/pyfr')

<AsyncResult: execute>

# Cython


# Numba

Numba provides code compilation like Cython, but does so in a simple, easy-to-use fashion.  All that's needed is to add a decorator (similar to a C/C++ pragma or Fortran directive) to compute kernel

```python
from numba import jit

@jit
def func(x):
    # some loop or comutationally intensive kernel
    return x
```

Python code with the `@jit` decorator are compiled at runtime (just-in-time) using the LLVM compiler, producing code that is on par with C/C++ and Fortran.

Here's what it look like:
<img src="../img/numba_process.png" style="height:450px">

# Python at Pawsey

Pawsey has a number of solutions for Python users:

- Compiled Python modules (Versions 2&3)
- Tuned NumPy/SciPy libraries (linked agains MKL and Cray-LibSci)
- Job-Packing Methods
- Shifter/Singularity

### Job-Packing

Users of Magnus and Galaxy are allocated an entire node, and charged accordingly, whether they use it all or not.  Many users want to run as many single-core Python jobs on a node as possible.  The easiest way to do that is to use job-packing in your SLURM jobscript.

```bash
#!/bin/bash -l
#SBATCH --nodes=1
#SBATCH --ntasks=24
#SBATCH --ntasks-per-node=24
#SBATCH --cpus-per-task=1
#SBATCH --time=00:10:00
#SBATCH --partition=debugq
#SBATCH --account=pawsey0001
#SBATCH --export=NONE

module swap PrgEnv-crady PrgEnv-gnu
module load python
module load numpy
module load scipy
module load matplotlib

srun --export=ALL -n 24 -N 1 python_job_wrapper.sh
```

We run a single wrapper script across 24 cores.  The key is how we write our wrapper script:

```bash
#!/bin/bash

python voxelSlice.py qs-curie-${SLURM_PROCID}"
```

Each instance of the wrapper script will call the Python interpreter, but we use the environment variable `SLURM_PROCID` to differentiate between the cores, and each core takes a different input data set.

The benefit with this method is it usually require no changes to existing Python scripts, but may require some thought be given as to how to structure data inputs.

### Containers

Pawsey also has Docker images available to use, particularly for Python users.  We have a program called Shifter installed on our Cray systems.  It allows for Docker containers to be run on a shared HPC system, while still maintaining performance.

<img src="../img/Shifter_OSU_allgather.png" style="height:450px">

<img src="../img/Shifter_OSU_bandwidth_reduced.png" style="height:450px">

Job scripts require minimal modification:
    
```bash
#!/bin/bash
#SBATCH --nodes=1
#SBATCH --time=00:10:00
#SBATCH --image=docker:pawsey/hpc-python:latest
 
 
module load shifter
 
 
srun -n 24 shifter python my_python_app.py <args>
```

And the Docker images provide a base of what most users would need to build their own images:
    
```docker
FROM ubuntu:latest

LABEL maintainer="brian.skjerven@pawsey.org.au"

RUN apt-get update \
      && apt-get install -y \
      cython \
      python-minimal \
      python-pip

RUN pip install --upgrade pip \
      && pip install \
      astropy \
      h5py \
      matplotlib \
      nose \
      numpy \
      pytest \
      scipy \
      setuptools

CMD ["/bin/bash"]
```

The other benefit to using Python in a container is related to dynamic library loading:

<img src="../img/shifter_magnus.png" style="height:450px">

## Final Thoughts

- Lots of diferent ways of explointing parallelism in Python
    * Some better sutied to different workflows
- Make use of Pawsey compiled Python libraries (performance and module compatibility)
- Try to use MPI capable libraries
- Multiprocess *can* be useful, but there is a performance hit
- Other Python options available to users (Shifter, job-packing)