# Parallel Processing - a Brief Introduction

This is a short introduction to parallel processing. It focuses on developing solutions to **"embarassingly parallel"** problems using **python** and **R**.

The structure of this introduction is to *evolve* an implementation from a basic standalone program through variants (different versions), using unix shell scripts and some of the programming features and libraries of the **python** and **R**, into programs that exploit systems that provide multiple processors.

The outcome of working through and studying these variants is to familiarise the reader with some of the aspects of parallel programming and to provide a collection of *recipes* that can be adapted and reused on systems such as **storm2** and the **virtual linux machines**. 

## Background - Computing the Matrix Exponential

The example problem chosen to introduce the reader to transforming a simple program into one suitable for parallelisation is that of computing the [matrix exponential](https://en.wikipedia.org/wiki/Matrix_exponential).


## Version 1 - Basic Implementation

There are various reasonably efficient ways of computing the matrix exponential, although finding a general method that is both reliable and accurate is an open research problem in numerical analysis. However, our example is based on a simple "brute force" approach which might be used to study the rate of convergence of the terms in the standard series representation. This is convenient for studying parallelisation as the computational complexity of determining the first $m$ terms of the series for a $n$ by $n$ matrix is $\cal{O}(n^3m)$. This provides us with both a linear and a super linear parameter to experiment with and control the run time of the different versions of the code.

#### python

```python
import numpy as np

np.random.seed(0)

n = 100
m = 10

X = np.random.rand(n,n)
Z = np.identity(n)
Y = Z

for k in range(1,m) :
    Z = np.matmul(Z,X)/k
    Y = np.add(Y,Z)
```

This code is available in the accompanying file matrix.exponential.v1.py and can be executed using

```bash
$ python matrix.exponential.v1.py 
```

Alternatively, you can just start a python interpreter and copy the code into the REPL.

#### R

```R
set.seed(0)

n <- 10
m <- 10

X <- matrix(runif(n^2),n,n)
Z <- diag(n)
Y <- Z

for(k in 2:m)
{
  Z <- Z%*%X/(k-1)
  Y <- Y + Z
}
```

This code is available in the accompanying file matrix.exponential.v1.R and can be executed using

```bash
$ Rscript matrix.exponential.v1.R 
```

Alternatively, you can just start R and copy the code into the REPL.

## Version 2 - Adding command line arguments

The second version of the code allows the parameters $n$ and $m$ of the problem be provided at the command line and the code is made reproducible by adding a random seed. Note that the arguments passed to the programs are interpreted as strings so have to be converted to a suitable type for use in the programs. 

The purpose of adding the arguments to the command line is that, later on, using the program with a range of arguments is automated.

*5Rs - Reproducability*

#### python

```python
import numpy as np
import sys

n = int(sys.argv[1])
m = int(sys.argv[2])
s = int(sys.argv[3])

np.random.seed(s)

X = np.random.rand(n,n)
Z = np.identity(n)
Y = Z

for k in range(1,m) :
    Z = np.matmul(Z,X)/k
    Y = np.add(Y,Z)
```

This code is available in the accompanying file matrix.exponential.v2.py and can be executed using (for example)

```bash
$ python matrix.exponential.v2.py 1000 10 1 
```

#### R

```R
args <- commandArgs(trailingOnly=TRUE)

n <- as.numeric(args[1])
m <- as.numeric(args[2])
s <- as.numeric(args[3])

set.seed(s)

X <- matrix(runif(n^2),n,n)
Z <- diag(n)
Y <- Z

for(k in 2:m)
{
  Z <- Z%*%X/(k-1)
  Y <- Y + Z
}
```

This code is available in the accompanying file matrix.exponential.v2.R and can be executed using (for example)

```bash
$ Rscript matrix.exponential.v2.R 1000 10 1 
```

## Version 3 - Adding Serialisation

Serialisation is computer science speak for saving and loading variables used in program for later analysis / use. 

*5Rs - Reproducability*

#### python

```python
import numpy as np
import sys

n = int(sys.argv[1])
m = int(sys.argv[2])
s = int(sys.argv[3])
stem = str(sys.argv[4])

np.random.seed(s)

X = np.random.rand(n,n)
Z = np.identity(n)
Y = Z

for k in range(1,m) :
    Z = np.matmul(Z,X)/k
    Y = np.add(Y,Z)


fileX = stem + ".X.n=" + str(n) + ".m=" + str(m) + ".s=" + str(s)
fileY = stem + ".Y.n=" + str(n) + ".m=" + str(m) + ".s=" + str(s)


np.save(fileX,X)
np.save(fileY,Y)

```

This code is available in the accompanying file matrix.exponential.v3.py and can be executed using (for example)

```bash
$ python matrix.exponential.v3.py 1000 10 1 matrix.exponential
```

where **matrix.exponential** is (a string) used to create a common **stem** for the file name used to store the data.

#### R

```R
args <- commandArgs(trailingOnly=TRUE)

n <- as.numeric(args[1])
m <- as.numeric(args[2])
s <- as.numeric(args[3])
stem <- args[4]

set.seed(s)

X <- matrix(runif(n^2),n,n)
Z <- diag(n)
Y <- Z

for(k in 2:m)
{
  Z <- Z%*%X/(k-1)
  Y <- Y + Z
}


fileX <- paste(stem,".X.n=",n,".m=",m,".s=",s,".RData",sep="")
fileY <- paste(stem,".Y.n=",n,".m=",m,".s=",s,".RData",sep="")


save(X,file=fileX)
save(Y,file=fileY)
```

This code is available in the accompanying file matrix.exponential.v3.R and can be executed using (for example)

```bash
$ Rscript matrix.exponential.v3.R 1000 10 1 matrix.exponential
```

where **matrix.exponential** is (a string) used to create a common **stem** for the file name used to store the data.

## Version 4 - Separating the science from the engineering

When evolving a program to work in an automated fashion on a multiprocessor system there can be a lot of "engineering" involved. For example, processing command line arguments, producing log files, handling errors, and so on. If this "engineering code" is combined directly with the "scientific code" it obscures what the "science" is and "how" it is being done. It is important to separate these concerns as early on in the process as possible.  

*5Rs - Replicability*

#### python

```python
import numpy as np
import sys


def matexp(X,m) :
    n,_ = np.shape(X)
    Z = np.identity(n)
    Y = Z
    for k in range(1,m) :
        Z = np.matmul(Z,X)/k
        Y = np.add(Y,Z)
    return Y
    

n = int(sys.argv[1])
m = int(sys.argv[2])
s = int(sys.argv[3])
stem = str(sys.argv[4])

np.random.seed(s)

X = np.random.rand(n,n)
Y = matexp(X,m)


fileX = stem + ".X.n=" + str(n) + ".m=" + str(m) + ".s=" + str(s)
fileY = stem + ".Y.n=" + str(n) + ".m=" + str(m) + ".s=" + str(s)


np.save(fileX,X)
np.save(fileY,Y)


```

#### R

```R
matexp <- function(X,m)
{
  n <- dim(X)[1]
  Z <- diag(n)
  Y <- Z
  for(k in 2:m)
  {
      Z <- Z%*%X/(k-1)
      Y <- Y + Z
  }
return(Y)
}

args <- commandArgs(trailingOnly=TRUE)

n <- as.numeric(args[1])
m <- as.numeric(args[2])
s <- as.numeric(args[3])
stem <- args[4]

set.seed(s)

X <- matrix(runif(n^2),n,n)
Y <- matexp(X,m)


fileX <- paste(stem,".X.n=",n,".m=",m,".s=",s,".RData",sep="")
fileY <- paste(stem,".Y.n=",n,".m=",m,".s=",s,".RData",sep="")


save(X,file=fileX)
save(Y,file=fileY)
```

## Version 5 - Using multiple source files

Version 5 extends version 4 by further separation of the engineering and science by using separate files. The matrix exponential is now in the file matrix.exponential.py or matrix.exponential.R

#### matexp.py

```python
import numpy as np
import sys

n = int(sys.argv[1])
m = int(sys.argv[2])
s = int(sys.argv[3])

np.random.seed(s)

X = np.random.rand(n,n)
Z = np.identity(n)
Y = Z

for k in range(1,m) :
    Z = np.matmul(Z,X)/k
    Y = np.add(Y,Z)
```

#### python

```python
from matexp import matexp
import numpy as np
import sys

n = int(sys.argv[1])
m = int(sys.argv[2])
s = int(sys.argv[3])
stem = str(sys.argv[4])

np.random.seed(s)

X = np.random.rand(n,n)
Y = matexp(X,m)


fileX = stem + ".X.n=" + str(n) + ".m=" + str(m) + ".s=" + str(s)
fileY = stem + ".Y.n=" + str(n) + ".m=" + str(m) + ".s=" + str(s)


np.save(fileX,X)
np.save(fileY,Y)

```

#### matrix.exponential.R

```R
matexp <- function(X,m)
{
  n <- dim(X)[1]
  Z <- diag(n)
  Y <- Z
  for(k in 2:m)
  {
      Z <- Z%*%X/(k-1)
      Y <- Y + Z
  }
return(Y)
}
```

#### R

```R
source("matrix.exponential.R")

args <- commandArgs(trailingOnly=TRUE)

n <- as.numeric(args[1])
m <- as.numeric(args[2])
s <- as.numeric(args[3])
stem <- args[4]

set.seed(s)

X <- matrix(runif(n^2),n,n)
Y <- matexp(X,m)


fileX <- paste(stem,".X.n=",n,".m=",m,".s=",s,".RData",sep="")
fileY <- paste(stem,".Y.n=",n,".m=",m,".s=",s,".RData",sep="")


save(X,file=fileX)
save(Y,file=fileY)
```

## Parallelisation via a Unix shell

Thus far non of the examples have been run in parallel. However, having now added facilities for processing command line arguments and serialised the outputs it is trivial matter to do this using a command shell. To do so, just open up multiple shells and run the program with different arguments in each shell. Voila - parallelisation !!

However, it is possible to do better.



### A simple for loop

#### shell script 

```bash
#!/bin/bash
for i in {1..5}
do
    echo "Welcome $i times"
done
```

The above example is available in the accompanying file for-loop-example-1.sh You can run it at the command line using

```bash
$ source for-loop-example-1.sh
```

### Running a program multiple times using a simple for loop

The basic shell based for loop can now be used to execute your program multiple times.

#### shell script

```bash
#!/bin/bash
for i in {1..5}
do
    echo "running n=2000 m = 10 s = $i"
    python matrix.exponential.v5.py 2000 10 ${i} mat.exp.test
done
```

You can run this by copying the file for-loop-example-2.sh into the same directory as your python code for version 5.

```bash
$ source for-loop-example-2.sh
```

However, this does not seem very "parallel" !!

Note - you will need to modify the shell scripts to run the R versions. This is left as an excercise for the reader !!!

### Running a program multiple times using a simple for loop - take 2

#### shell script

```bash
#!/bin/bash
for i in {1..5}
do
    echo "running n=1000 m = 10 s = $i"
    python matrix.exponential.v5.py 2000 10 ${i} mat.exp.test &
done
```

The difference here is the **&**. Its effect is to run the command "in the background". The shell (running the shell commands in the shell script) and the other commands it starts (your program) are now running **asynchronously**.

Voila - parallelisation !! 

Are you impressed ?

## Some Issues

### *Hang on een minuut !!*

Hmmm. That was not what quite what I was expecting !!! And by the way, there is something else I have noticed ...

### Issue 1 - processes and threads. (See white board for details)

**Conclusion.** If your code uses a library, system, or software tool / package that is multithreaded, you may need to control the number of threads it has access to. There are several ways of doing this, but the most common reliable way of doing it is by specifying the maximum number of threads to use (per process) using the shell (from which you start your parallel jobs). 

```bash
$ export OMP_NUM_THREADS=<val>
```
where <val> is the maximum number of threads to use (per process). E.g
```bash
$ export OMP_NUM_THREADS=1
```
will effectively turn multi threading off.

**Note**. In scientific computing, it is **very uncommon** to set **OMP_NUM_THREADS** to a value greater than the number of physical cores on your system. Doing so is is typically inefficient and usually results in your code slowing down !! 

It is **very common** for systems like webservers and data base systems to allocate more threads than there are available processors - but this is for some very specific reasons (non of which are to do with scientific computing). 

### Issue 2 - What happens when I log out !!

The parallelisation introduced so far is actually pretty close to the way things are done in practice, all be it typically automated by a job scheduler when working on a shared system. 

However, when jobs run for a long time, you might need to log out (or lose the connection to my remote system for some reason) before the jobs are finished. Depending on what system you are using, this may mean that your shell (and all the jobs you started using this shell) will be killed (this is actually the correct term). This can be bad.

To overcome this problem you can run your shell scripts in a slightly different way.  

```bash
$ nohup source for-loop-example-3.sh
```

This command runs your shell script in a way that it does not get killed when you "hang up". Note - **nohup** is short for "no hang up" and harks back to the days when most people connected to remote systems using a dial up modem.

## Basic job management

What happens if you have started some jobs and want to stop them before they are finished ?

### Identifying your processes

Each process you start has a unique id called the procecess ID, or **pid** for short. To see all of your running jobs and programs along with their associated pid you can use the **ps** command.

```bash
$ ps -ef 
```

However, this lists all of the processes running on the system !! You can filter this to your own processes by using 

```bash
$ ps -ef | grep "<username>"
```

where <username> is your user name.

Once you have identified your processes **pid** you can **kill** it.

```bash
$ kill -9 <pid>
```

where pid is your processes pid. The -9 forces **kill** to be aggressive when killing your job. For example, it will kill all of the processes children as well !!

There are various other commands you can use to track your processes, for example **top**.

## Version 6

Back to our code. The next version uses system commands from within your program to discover the process id and print it out. This can help you track your processes in the future.

#### python

```python
from matexp import matexp
import numpy as np
import sys
import os
from datetime import datetime

pid = os.getpid()
print("pid is : ",pid)
now = datetime.now()
print("start time : ",now)  


n = int(sys.argv[1])
m = int(sys.argv[2])
s = int(sys.argv[3])
stem = str(sys.argv[4])

np.random.seed(s)

X = np.random.rand(n,n)
Y = matexp(X,m)


fileX = stem + ".X.n=" + str(n) + ".m=" + str(m) + ".s=" + str(s)
fileY = stem + ".Y.n=" + str(n) + ".m=" + str(m) + ".s=" + str(s)


np.save(fileX,X)
np.save(fileY,Y)

now = datetime.now()
print("end time : ",now) 

```

#### R

```R
source("matrix.exponential.R")

args <- commandArgs(trailingOnly=TRUE)

n <- as.numeric(args[1])
m <- as.numeric(args[2])
s <- as.numeric(args[3])
stem <- args[4]

pid <- as.character(Sys.getpid())
now <- as.character(Sys.time())

cat("pid i : ",pid,'\n')
cat("start time : ",now,'\n')


set.seed(s)

X <- matrix(runif(n^2),n,n)
Y <- matexp(X,m)


fileX <- paste(stem,".X.n=",n,".m=",m,".s=",s,".RData",sep="")
fileY <- paste(stem,".Y.n=",n,".m=",m,".s=",s,".RData",sep="")


save(X,file=fileX)
save(Y,file=fileY)


now <- as.character(Sys.time())
cat("end time : ",now,'\n')
```

Note that the "logging" information (pid. start time/date, etc) are not saved to a file but just printed out. Of course, you could save this information to a file, but for basic logging information it is possible to do this from the shell script.

#### shell script

```bash
#!/bin/bash
for i in {1..5}
do
    echo "running n=2000 m = 10 s = $i"
    python matrix.exponential.v6.py 2000 10 ${i} mat.exp.test > log.${i}.out &
done
```

## Running parallel jobs from within python or R

So far, parallel jobs have been managed via the shell using basic shell facilities. An alternative is to use the features of programming environment to create new processes to run subsections of your code. This has the advantage that these libraries usually provide associated facilities to "marshall" data structures form your program to new processes. This removes the need to keep "serialising" data to and from (usually temporary) files which can be time consuming to implement and difficult to maintain, particularly across different systems.

Note though, that "underneath the bonnet" these facilities are still using a basic shell to create and manage processes and create their own temporary files to marshall data between the processes and your main program.  

*5Rs - Re-usability*

## Version 7

The basic python facility for parallelising code is asynchronous. There is currently no direct equivalent of this in R.

#### python

```python
from matexp import matexp
import numpy as np
from multiprocessing import Pool

n = 2000
m = 10
s = 0

np.random.seed(s)

pool = Pool(2)

X = np.random.rand(n,n)
print("starting Y1")
H1 = pool.apply_async(matexp,(X,m))

X = np.random.rand(n,n)
print("starting Y2")
H2 = pool.apply_async(matexp,(X,m))

print("waiting for Y1")
Y1 = H1.get()

print("waiting for Y1")
Y2 = H2.get()

```

Notice that you have to write code to manage the asynchronous behavior. The example uses a pool to create processes and then allocates a function and data to them. The immediate result of this is a "handle" to the (future) result of the operation. The Handles can then be processed to obtain the results when they become available. 

This asynchronous approach may seem a bit clumsy, but it has the advantage that, with clever (and complicated) additional code you can arrange it so that the different processes can share information and communicate with each other. In other words, you use an asynchronous library to allow you to write your own bespoke synchronisation code.

Writing synchronisation code is hard to do and very difficult to generalise. For this reason, most programmers try and avoid the need for synchronisation all together. 

## Version 8

The python and R languages provide synchronisation code for the trivial case of no communication or dependencies between processes in their parallel libraries. Even for this trivial case there are different ways of doing things, but this introduction looks at only one of them. 

For the case considered here there is only one function being used to do the parallelisation and it is run with a list of different argument parameters.

#### python

```python
from matexp import matexp
import numpy as np
from multiprocessing import Pool

n = 2000
m = 100
s = 0

np.random.seed(s)

pool = Pool(2)

X1 = np.random.rand(n,n)
X2 = np.random.rand(n,n)

print("starting Y1 and Y2")
result = pool.starmap(matexp,[(X1,m),(X2,m)])


```

#### R

```R
source("matrix.exponential.R")

library(parallel)

n <- 2000
m <- 100
s <- 0

set.seed(0)

X1 <- matrix(runif(n^2),n,n)
X2 <- matrix(runif(n^2),n,n)

# detectCores()

result <- mclapply(list(X1,X2),matexp,m,mc.cores=4)
```

Notice that in the R version, only one of the parameters can be varied. It is possible to use fairly simple functional programing techniques to overcome this limitation, but these are not covered here.

## When things go wrong - using exceptions

There are many good reasons for using the parallel facilities provided by python and R. However, there are certain limitations you need to be aware of. In particular, what happens if one of your processes fails ? Reasons for this happening include (but are not limited to) bugs in your code that only become a problem for certain parameter values, or problems with the system e.g. running out of memory.

When one of the processes fails the function that you used in your parallelisation will not return a value. If this happens, what does python or R put in the list of results ? The short answer is that it does not put anything in that is of any real use. One remedy to this is to handle the case of errors in your code directly. This may cover the first scenario above (bugs in your code) but does not handle the case of a system error like running out of memory (or failing to get a valid license key for third party commercial software perhaps !). 

However, there is a mechanism for "catching" these system like errors - **exceptions**.

The following examples can be found in the files exception.py and exception.R

#### python

```python
from multiprocessing import Pool


def infractus(n) :
    try :
        if n > 10 :
            return n/0 # ooops !!!
        return 2*n
    except ZeroDivisionError :
        return "divided by zero"
    except :
        return "something went wrong - not sure what"
    

pool = Pool(2)

result = pool.starmap(infractus,[(2,),(12,)])
```

#### R

```R
library(parallel)

infractus <- function(n)
{
   result <- tryCatch(
   {
	if(n > 10)
   	{
	  return(n + "a")
   	}
   	return(2*n)
   },
   error = function(cond)
   {
     return(cond)
   }
   )
   return(result)
}



result <- mclapply(list(2,12),infractus,mc.cores = 4)

infractus(2)

infractus(12)
```