# Parallel Programming

**Author:** Nico Curti

**Course:** Software and Computing for Applied Physics - 87948

**Github:** [Nico-Curti](https://github.com/Nico-Curti)

The starting point is always:
    
    “Premature optimization is the root of all evil” (cit. Donald Knuth)

And, moreover:

    “Not all the tasks can be performed in parallel! A code parallelization must be your last choice since it is certainly 
    the harderst way in which optimize a code!” (cit. Nico Curti)

## The idea

The idea appears quite simple and it is the same of every best-practice approach to a problem:

* divide the problem into a series of simpler tasks
* give an order to these tasks
* if they are **independent** there are no reason to perform them one-by-one into a sequential mode, so you can (try to) parallelize them!

Languages and tools like `Make`, `CMake`, and `Snakemake` are based on this idea and they provide easy solution to address the parallelism.

$$\uparrow$$ 

    if you are not well confident about your programming skills they are certainly the best approach

The approach is quite close also to the *Map-Reduce* paradigm of the Functional programming

```python
import numpy as np

a = np.arange(10)
for i in range(10):
    a[i] += 1
```

All these tasks are totally independent, so they can be potentially performed in parallel!

## Parallelism in Python

Python provides a series of libraries dedicated to the parallel programming without any external dependency.

The most famous one is [multiprocessing](https://docs.python.org/3/library/multiprocessing.html)

**NOTE:** 

The use of this library has several issues in the IPython console and under Windows OS it requires the inclusion of a "main" 

```python
def func (x):
    return x

if __name__ == '__main__':
    y = func(2)
    print(y)
```

**NOTE 2**:

If you want to use this library into an IPython console (like me, using the current Jupyter notebook!) you can easily move to the `multiprocess` counter part, which is a simple wrap of the standard library.

Jupyter notebooks don't work with `multiprocessing` because the module pickles (serialises) data to send to processes.
`multiprocess` is a fork of multiprocessing that uses dill instead of pickle to serialise data which allows it to work from within Jupyter notebooks. 
The API is identical!

```bash
python -m pip install multiprocess
```

In [4]:
import numpy as np
from time import time as now

N = 100_000
a = np.arange(N)
tic = now()

for i in range(N):
    a[i] *= 2
toc = now()
print(f'Elapsed time: {toc - tic:.6f} sec')
print(a[:10])

Elapsed time: 0.022000 sec
[ 0  2  4  6  8 10 12 14 16 18]


In [10]:
import multiprocess

N = 100_000
a = np.arange(N)

# get the number of possible threads
nth = multiprocess.cpu_count()
print(f'We are using {nth:d} threads in parallel')

tic = now()
with multiprocess.Pool(nth) as pool:
    a = pool.map(lambda x : x*2, a)
toc = now()

print(f'Elapsed time: {toc - tic:.6f} sec')
print(a[:10])

We are using 24 threads in parallel
Elapsed time: 2.664999 sec
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


## Not a so good result...

Using 24 parallel processes we have a more than 100x slower code!

## Why?

* The creation of multiple parallel processes is a very expensive task in terms of computation.

* If there are other processes in backend (very common situation in a personal computer!) the split of the job is not so easy

* Despite the great number of iterations, the task is not particularly intense from a computational point of view, so we are loosing more time in the job orchestration than in the computational task!

* Last but not least, the multiprocessing in Python is a sort of fake...

## Let's try with a more complex example

We will use one old friend to this purpose...

In [11]:
import numpy as np

N = 10000 # number of MC events
N_run = 1000 # number of runs
pi = np.zeros(N_run) # values of pi
tic = now()
for I in range(N_run):
    Nhits = 0.0
    for i in range(N):
        x = np.random.rand() * 2 - 1
        y = np.random.rand() * 2 - 1
        distance = x*x + y*y
        if distance < 1:
            Nhits += 1.0
    pi[I] = 4. * Nhits / N
        
toc = now()
print('pi with', N, 'steps for', N_run, 
      'runs is', np.mean(pi), 'in', toc-tic, 'sec'
     )

pi with 10000 steps for 1000 runs is 3.1409284 in 8.518996000289917 sec


In the Metropolis algorithm, all the evaluation of the different PI computation are totally independent, so it could be a good candidate for a parallel approach

Let's move the computation into a function

In [20]:
def Metropolis (I):
    Nhits = 0.0
    N = 10000 # number of MC events
    for i in range(N):
        x = np.random.rand() * 2 - 1
        y = np.random.rand() * 2 - 1
        distance = x*x + y*y
        if distance < 1:
            Nhits += 1.0
    return 4. * Nhits / N

Now we can split the different evaluation of the Metropolis estimation into a series of independent tasks

In [21]:
tic = now()
with multiprocess.Pool(nth) as pool:
    pi = pool.map(Metropolis, range(N_run))
toc = now()
print('pi with', N, 'steps for', N_run, 
      'runs is', np.mean(pi), 'in', toc-tic, 'sec'
     )

NameError: name 'np' is not defined

## WTF?!

I have already imported `numpy` package!

The "process" created by the library is a sort of multiple runs of "independent" scripts, like

```bash
$ python run_iteration1.py
$ python run_iteration2.py
$ ...
$ python run_iterationN.py
```

A possible workaround

In [27]:
def Metropolis (I):
    import numpy as np
    
    Nhits = 0.0
    N = 10000 # number of MC events
    for i in range(N):
        x = np.random.rand() * 2 - 1
        y = np.random.rand() * 2 - 1
        distance = x*x + y*y
        if distance < 1:
            Nhits += 1.0
    return 4. * Nhits / N

In [29]:
tic = now()
with multiprocess.Pool(nth) as pool:
    pi = pool.map(Metropolis, range(N_run))
toc = now()
print(f'We are using {nth:d} threads in parallel')
print('pi with', N, 'steps for', N_run, 
      'runs is', np.mean(pi), 'in', toc-tic, 'sec'
     )

We are using 24 threads in parallel
pi with 10000 steps for 1000 runs is 3.1419528 in 1.2859978675842285 sec


Some **NOTE**:

* The sequential computation of the Metropolis took 8.5 sec
* The paralle computation of the Metropolis took 1.3 sec
* The global speed up is **6.5x**

* We have obtained a 6.5x gain using 24x computational power... not so good

## Change the loop

In our algorithm there are 2 nested loops... what happens moving the parallelization into the inner loop?

In [42]:
N = 10000 # number of MC events
N_run = 100 # number of runs
pi = np.zeros(N_run) # values of pi

def innerMetropolis (i):
    import numpy as np
    x = np.random.rand() * 2 - 1
    y = np.random.rand() * 2 - 1
    distance = x*x + y*y
    return distance < 1

tic = now()
for I in range(N_run):
    with multiprocess.Pool(nth) as pool:
        Nhits = sum(pool.map(innerMetropolis, range(N)))
    pi[I] = 4. * Nhits / N
    
toc = now()

print(f'We are using {nth:d} threads in parallel')
print('pi with', N, 'steps for', N_run, 
      'runs is', np.mean(pi), 'in', toc-tic, 'sec'
     )

We are using 24 threads in parallel
pi with 10000 steps for 100 runs is 3.1452480000000005 in 48.4399356842041 sec


## Main rule of code optimization

To get the best performances of your code:

1. Parallelize the outer loop
2. Vectorize the inner loop(s)

## Another example

Let's try another old friend

In [43]:
import numpy as np
from scipy.spatial.distance import euclidean

def point_distance (N):
    pt = np.random.uniform(0, 1, size=(N, 2))    
    for i in range(N):
        for j in range(N):
            x1, y1 = pt[i]
            x2, y2 = pt[j]
            d = euclidean([x1, y1], [x2, y2])

%timeit point_distance(N=1000)

6.63 s ± 34.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Optimize it with parallel programming

In [45]:
def point_distance (i, pt):
    import numpy as np
    from scipy.spatial.distance import euclidean
    
    for j in range(len(pt)):
        x1, y1 = pt[i]
        x2, y2 = pt[j]
        d = euclidean([x1, y1], [x2, y2])

In [48]:
N = 1000
pt = np.random.uniform(0, 1, size=(N, 2))    

tic = now()
with multiprocess.Pool(nth) as pool:
    for i in range(N):
        pool.apply_async(point_distance, args=(i, pt))
toc = now()
print(f'Elapsed time: {toc - tic:.3f} sec')

Elapsed time: 0.120 sec


## Can we do it better?

* Try to optimize the inner loop with the vectorization

* Try to balance the work of different threads

* Try to **increase** the work of the threads!

In [49]:
from itertools import product

list(product(range(3), range(3)))

[(0, 0), (0, 1), (0, 2), (1, 0), (1, 1), (1, 2), (2, 0), (2, 1), (2, 2)]

In [54]:
def point_distance (pi, pj):
    from scipy.spatial.distance import euclidean
    d = euclidean(pi, pj)

In [56]:
N = 1000
pt = np.random.uniform(0, 1, size=(N, 2))    

tic = now()
with multiprocess.Pool(nth) as pool:
    for (i, j) in product(range(N), range(N)):
        pool.apply_async(point_distance, args=(pt[i], pt[j]))
toc = now()
print(f'Elapsed time: {toc - tic:.3f} sec')

Elapsed time: 21.882 sec


## This sounds counterintuitive!

The cost for the creation of new threads in Python is **very** expensive and the increment of parallelism doesn't provide a real gain with high-level languages!

## Parallelism in low-level

OpenMP is an Application Program Interface (API) that provides a portable, scalable model for developers of shared memory parallel applications. 
OpenMP works with Symmetric Multiprocessing (SMP) The API supports C/C++ and Fortran on a wide variety of architectures.

[Official Guide](https://www.openmp.org/resources/refguides/)

[Simple Guide](https://curc.readthedocs.io/en/latest/programming/OpenMP-C.html)

The parallelism is given by the insertion of pre-processor directives identified by the syntax:

```cpp
#include <omp.h>

#pragma omp
```

## Standard program

```cpp
#include <iostream>
#include <omp.h>

int main(){
    std :: cout << "Hello from process: " 
                << omp_get_thread_num() 
                << std :: endl;
    return 0;
}
```

```bash
$ g++ hello_world.cpp -o hello_world -fopenmp
$ ./hello_world
Hello from process: 0
```

## Parallel program

```cpp
#include <iostream>
#include <omp.h>

int main(){
    #pragma omp parallel
    {
        std :: cout << "Hello from process: " 
                    << omp_get_thread_num() 
                    << std :: endl;
    }
    return 0;
}
```

```bash
$ g++ parallel_hello_world.cpp -o parallel_hello_world -fopenmp
$ ./parallel_hello_world
Hello from process: 3
Hello from process: 0
Hello from process: 2
Hello from process: 1
```

## Memory Management

Memory management is a quintessential component of any parallel program that involves data manipulation.

* **Private** types create a copy of a variable for each process in the parallel system.
* **Shared** types hold one instance of a variable for all processes to share.

```cpp
#pragma omp shared(shar_Var1) private(priv_Var1, priv_Var2)
```

```cpp
#include <iostream>
#include <omp.h>

int main(){
    int tid;
    #pragma omp parallel
    {
        tid = omp_get_thread_num();
        std :: cout << "Hello from process: " 
                    << tid
                    << std :: endl;
    }
    return 0;
}
```

```bash
Hello from process: 3
Hello from process: 2
Hello from process: 2
Hello from process: 2
```

```cpp
#include <iostream>
#include <omp.h>

int main(){
    int tid;
    #pragma omp parallel private(tid)
    {
        tid = omp_get_thread_num();
        std :: cout << "Hello from process: " 
                    << tid
                    << std :: endl;
    }
    return 0;
}
```

```bash
Hello from process: 3
Hello from process: 0
Hello from process: 2
Hello from process: 1
```

# Concurrency

Strictly related to the memory management "problems" there is the idea of concurrency.

Since the variables are shared (as default) between the processes, the possibility to "touch" the same variables at the same time is not negligible, causing a *concurrency issue*.

The management of several workers requires the introduction of an **orchestrator** who poses *barriers* and *critial directives*.

```cpp
#include <omp.h>

int main () {
    int partial_Sum, total_Sum;

    #pragma omp parallel private(partial_Sum) shared(total_Sum)
    {
        partial_Sum = 0;
        total_Sum = 0;

        #pragma omp for
        {
            for(int i = 1; i < 1000; ++i){
                partial_Sum += i;
            }
        }
    }
    return 0;
}
```

Each thread initializes its own copy of the `partial_Sum` variable with a value of zero.

Then, inside a parallel loop (each thread takes a part of the 1000 possible iterations!), each thread increments its own variable.

To compute the total sum we need to put a "critical barrier" at the end, saying:

    In this part can enter a thread one-by-one (Safe region)

```cpp
#include <omp.h>

int main () {
    int partial_Sum, total_Sum;

    #pragma omp parallel private(partial_Sum) shared(total_Sum)
    {
        partial_Sum = 0;
        total_Sum = 0;

        #pragma omp for
        {
            for(int i = 1; i < 1000; ++i){
                partial_Sum += i;
            }
        }
        //Create thread safe region.
        #pragma omp critical
        {
            //add each threads partial sum to the total sum
            total_Sum += partial_Sum;
        }
    }
    return 0;
}
```

Another possibility in this cases is given by the **reduction** clause.

The idea is always the same but, since the *Map-Reduce* paradigm is quite standard, OpenMP provides a ready-to-use feature for these cases.

`
reduction (op : list)
`

```cpp
#include <omp.h>

int main () {
    int total_Sum = 0;

    #pragma omp parallel for reduction (+ : total_Sum)
    for (int i = 0; i < 1000; ++i) {
        total_Sum += i;
    }

    return 0;
}
```

## Example

Parallel Metropolis Algorithm

```cpp
#include <random>
#include <iostream>
#include <numeric>
#include <iomanip>
#include <omp.h>
#define NUM_THREADS omp_get_max_threads()

int main()
{
	std :: random_device rd;
	std :: mt19937 eng{rd()};
	std :: size_t i, I, N = 1e5, N_run = 1e4, Nhits = 0;
	long double x, y, res, pi = 0.0, pi_tmp = 0.0;
    
	std :: uniform_real_distribution < long double > uniform_dist{-1., 1.};
    
    omp_set_num_threads(NUM_THREADS);
	
	long double start_time = omp_get_wtime();
```

```cpp
    
    #pragma omp parallel private (I, Nhits)
	{
		#pragma omp for reduction (+ : pi_tmp)
		for (I = 0; I < N_run; ++I)
		{
			Nhits = 0;
			#pragma omp parallel private (i, x, y, res)
			{
				#pragma omp for reduction (+ : Nhits)
				for (i = 0; i < N; ++i)
				{
					x = uniform_dist(eng);
					y = uniform_dist(eng);
					res = x*x + y*y;
					if (res < 1)
						Nhits += 1;
				}
			}
			pi_tmp += (4.0 * ((long double)Nhits/(long double)N));
		}
	}
```

```cpp

    pi = pi_tmp / N_run;
	long double run_time = omp_get_wtime() - start_time;
	std :: cout << "pi with " << N << " steps for " << N_run << " runs is " << std :: set_precision(16) << pi << " in " << run_time << " sec" << std :: endl;

	return 0;
}
```

The same idea can be applied also considering different computer as "workers", moving to a *multithreading* paradigm to a *distributed* paradigm.

Formally this can be done moving from *OpenMP* to *MPI* directives (with some edits and features!)

## Recursive Approach

There are several algorithms that can be better represented as recursive functions.

In [58]:
def fibonacci(n):
    if n <= 1:
        return n
    else:
        return(fibonacci(n-1) + fibonacci(n-2))

nterms = 10
print('Fibonacci sequence:')
for i in range(nterms):
    print(fibonacci(i))

Fibonacci sequence:
0
1
1
2
3
5
8
13
21
34


The possibility to re-call the same function can cause infinite loops if not carefully managed but it could be also the most elegant way to represent a tree scheme of computing.

The same approach is used also by **Metaprogramming** paradigm: if you are interested on this topic a very interesting example is given in the [PYTHON-easyDag](https://github.com/eDIMESLab/easyDAG) developed by Prof. Giampieri (and in its c++ counter part of my project [CPP-easyDag](https://github.com/Nico-Curti/easyDAG)).

The paradigm is always the same:

1. split the job into a series of sub-jobs
2. repeat the step 1 until a stop criteria
3. perform the computation
4. merge together the intermediate results until the root

## Is it possible to work with multiple threads in this scheme?

As for the `omp parallel for` directive, there is also the possibility to manage the threads according to independent `task` and `section`

```cpp
#pragma omp parallel sections
{
    #pragma omp section
    {
        Task 1
    }
    #pragma omp section
    {
        Task 2
    }
}
```

OpenMP sections are **static**, that is, they are good if you know, when you are writing the program, how many of them you will need.

```cpp
omp_set_num_threads ( 2 );
#pragma omp parallel default (none)
{
    #pragma omp task
    fprintf( stderr, "A\n" );
    
    #pragma omp task
    fprintf( stderr, "B\n" );
}
```

Tasks are very much like OpenMP **sections**, but sections are static, that is, the number of sections is set when you write the code, whereas **tasks** can be created anytime, and in any number, under control of your program's logic.

## Merge-sort example

Let's start from the Python version to better clarify the idea behind the algorithm

In [60]:
def mergeSort(arr):
    if len(arr)<=1:
        return arr
    mid = len(arr) // 2
    left = mergeSort(arr[:mid])
    right = mergeSort(arr[mid:])
    
    return merge(left, right)

In [61]:
def merge(arr1, arr2):
    i, j = 0, 0
    result = []
    while (i < len(arr1) and j < len(arr2)):
        if arr2[j] > arr1[i]:
            result.append(arr1[i])
            i += 1
        else:
            result.append(arr2[j])
            j += 1
    while (i < len(arr1)):
        result.append(arr1[i])
        i += 1
    while (j < len(arr2)):
        result.append(arr2[j])
        j += 1
    return result

In [67]:
arr = [12, 11, 13, 5, 6, 7]
print('Given array is:  ', arr)
arr = mergeSort(arr)
print('Sorted array is: ', arr)

Given array is:   [12, 11, 13, 5, 6, 7]
Sorted array is:  [5, 6, 7, 11, 12, 13]


## Move to a more advanced implementation

The idea is to split the work among all the possible threads, creating a tree of instruction.

Each thread will perform the analysis of a small chunk of the array, producing the sorting of its slice in **serial** mode (!).

The result of the entire array will be merged together, going back along the tree.

```cpp
void mergesort_serial (float * a, int start, int end)
{
  if ((end - start) == 2) {
    if (a[start] < a[end - 1]) {
      return;
    } else {
      std :: swap(a[start], a[end - 1]);
      return;
    }
  }
  const int pivot = start + ((end-start) >> 1);

  if ((end - start) < 100) { // minimum sorting size
    std :: sort(a + start, a + end);
    return;
  }  else  {
    mergesort_serial(a, start, pivot, ord);
    mergesort_serial(a, pivot, end, ord);
  }
  std :: inplace_merge(a + start, a + pivot, a + end, ord);

  return;
}
```

```cpp
void mergesort_parallel_omp (float * a, int start, int end, int threads)
{
  const int pivot = start + ((end - start) >> 1);

  if (threads <= 1) {
    mergesort_serial(a, start, end);
    return;
  } else {
#pragma omp task shared (start, end, threads)
    {
      mergesort_parallel_omp(a, start, pivot, threads >> 1);
    }
#pragma omp task shared (start, end, threads)
    {
      mergesort_parallel_omp(a, pivot, end, threads - (threads >> 1));
    }
#pragma omp taskwait
  }

  std :: inplace_merge(a + start, a + pivot, a + end);

  return;
}
```

```cpp
void sort (float * a, int start, int end)
{
  const int ths = omp_get_num_threads();
  const int nth = (ths % 2) ? ths - 1 : ths;
  const int diff = end % nth;
  const int size = diff ? end - diff : end;

  #pragma omp single
  #pragma omp taskgroup
  {
    mergesort_parallel_omp(a, start, size, nth);
  } // end single section

  if (diff)
  {
    std :: sort(a + size, a + end);
    std :: inplace_merge(a + start, a + size, a + end);
  }

  return;
}
```

## Other possibility of parallelism

There is also another possibility to split your code-work among different processes, called **asynchronous programming**.

This is probably the most common approach in standard programming and one of the most powerful (and sometimes hardest) one.

We will see some examples about it in the next application.