<div style='background-image: url("../../share/images/header.svg") ; padding: 0px ; background-size: cover ; border-radius: 5px ; height: 250px'>
    <div style="float: right ; margin: 50px ; padding: 20px ; background: rgba(255 , 255 , 255 , 0.7) ; width: 50% ; height: 150px">
        <div style="position: relative ; top: 50% ; transform: translatey(-50%)">
            <div style="font-size: xx-large ; font-weight: 900 ; color: rgba(0 , 0 , 0 , 0.8) ; line-height: 100%">Computational Seismology</div>
            <div style="font-size: large ; padding-top: 20px ; color: rgba(0 , 0 , 0 , 0.5)">Parallel Computing - An introduction to MPI4Py</div>
        </div>
    </div>
</div>

Seismo-Live: http://seismo-live.org

##### Authors:
* David Vargas ([@dvargas](https://github.com/davofis))
* Heiner Igel ([@heinerigel](https://github.com/heinerigel))

This notebook covers the following aspects:

* Introduction to parallel computing
* Serial vr parallel computing 
* Flynn's Classical Taxonomy
* General concepts and terminology
* Amdahl's Law
* Introduction to MPI4Py
---

## 1. Introduction to Parallel Computing

In parallel computing many calculations are executed simultaneously. The real world is massively parallel since many independent and complex processes occur at the same time. This notebook is intended as an introduction to some of the most basic concepts in parallel programming with MPI4Py, a module providing standard functions to perform tasks under the standard of the message passing interface MPI.  

### 1.1 Serial vr parallel computing
A parallel computer can be understood as a set of processors working together to perform an specific task. In contrary, serial computers execute one instruction at the time in a single processor till the task is completed (See figures below). Some of the most relevant characteristics are listed: 

<p style="width:30%;float:right;padding-left:10px;padding-right:20px">
<img src=images/Parallel_Computing.png>

<p style="width:30%;float:right;padding-left:10px;padding-right:20px">
<img src=images/Serial_Computing.png>

Serial computing:
* A problem is broken into a discrete series of instructions
* Instructions are executed sequentially one after another
* Executed on a single processor
* Only one instruction may execute at any moment in time 

Parallel computing:
* A problem is broken into discrete parts that can be solved concurrently
* Each part is further broken down to a series of instructions
* Instructions from each part execute simultaneously on different processors
* An overall control/coordination mechanism is employed


### 1.2 Flynn's Classical Taxonomy
<p style="width:40%;float:right;padding-left:40px;padding-right:30px">
<img src=images/flynn.png>

<div style="text-align: justify">
Flynn's taxonomy is a computer architecture classification, it has been used since 1966 and is based on the number of concurrent instruction (single or multiple) and data streams  (single or multiple) available in the architecture. The four categories in Flynn's taxonomy are the following:   
</div>
* Single Instruction stream Single Data stream (SISD)
* Single Instruction stream, Multiple Data streams (SIMD)
* Multiple instruction streams, single data stream (MISD)
* Multiple instruction streams, multiple data streams (MIMD)

### 1.3 General Concepts and Terminology
* **Node:** A single computer. Nodes are networked together to comprise a supercomputer.
* **CPU / Socket / Processor / Core:** CPUs with multiple cores are sometimes called "sockets" 
* **Task:** A Discrete section of computational work
* **Pipelining:** Breaking a task into steps that are performed by different processor units
* **Shared Memory:** Computer architecture where all processors have direct access to common physical memory
* **Distributed Memory:** Computer architecture where access to physical memory is not common
* **Embarrassingly Parallel:** Solving many similar, but independent tasks simultaneously
* **Scalability:** Refers to an increase in speedup with the addition of more resources

### 1.4 Amdahl's Law
<div style="text-align: justify">
The scalability or potential program speedup is defined by the fraction of code (P) that can be parallelized. 
Amdahl's Law determine the speedup when using parallel processors on a program in contrast with the serial case. We define speedup as the time it takes a program to execute in serial (with one processor) divided by the time it takes to execute in parallel (with many processors), i.e.

<br>
<br>
\begin{equation}
Speedup = \frac{t(1)}{t(n)} = \frac{1}{\frac{p}{n} + s}
\end{equation}

</div>
* $p$ = Fraction of code can be parallelized
* $n$ = Number of processors
* $s$ = Serial fraction
---

## 2. Introduction to MPI4Py
Python code is one of the most famous languages used by scientific community. Indeed, sharing code to reproduce simulations has gain relevance in recent years [see this nature article](http://www.nature.com/news/interactive-notebooks-sharing-the-code-1.16261). In this sense, parallel computing with python is essential to execute algorithms as efficient as possible. In order to glue MPI with Python, mpi4py emerge as a way to do so. mpi4py provides python bindings to most of the functionality of the MPI standard of the message passing interface. The official web site for mpi4py is [mpi4py.scipy.org](http://mpi4py.scipy.org/), additionally, you can find a [user manual](http://mpi4py.scipy.org/docs/usrman/index.html) and a [API reference](http://mpi4py.scipy.org/docs/apiref/index.html) for a more comprehensive description. 

### Getting started
This notebook only gives a short description of the principal elements contained in the mpi4py package. Before you start, make sure you have launched a new Ipython cluster with the desired number of engines. Having done that, run the Ipython cluster setup cell as well as the imports cell, this will allow you to use MPI4Py into the jupyter notebook.

### Ipython cluster setup

In [None]:
# Import all necessary libraries, this is a configuration step for the exercise.
# Please run it before the simulation code!
from ipyparallel import Client
cluster = Client(profile='mpi')
cluster.block = True  # use synchronous computations
dview = cluster[:]
dview.activate()      # enable magics

cluster.ids

### Imports
Libraries are imported on all workers. This is a configuration step for the exercise. Please run it before the simulation code!

In [None]:
%%px
from mpi4py import MPI
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
from mpl_toolkits.mplot3d import Axes3D

import warnings
warnings.filterwarnings("ignore")

# Show the plots in the Notebook.
#%pylab inline
#==================================
comm = MPI.COMM_WORLD

size = comm.Get_size()
rank = comm.Get_rank()
name = MPI.Get_processor_name()
#==================================

### 1. Hello World

In [None]:
%%px
print("Hello, World! I am process %d of %d on %s.\n" % (rank, size, name))

### 2.1 Operations on different processors I 

In [None]:
%%px
# Variables defined on each processor
a = 2
b = 3

if rank == 0:
    print('a + b = %d' %(a + b))
if rank == 1:
    print ('a * b = %d' %(a * b))
if rank == 2:
    print ('a / b = %d' %(a / b))
if rank == 3:
    print ('max(a,b) = %d' %max(a,b))

### 2.2 Operations on different processors II

In [None]:
%%px
#------------------------------------------------------------------
# Processor 0
#------------------------------------------------------------------
if rank == 0:
    fig = plt.figure(figsize=(6,5))
    t = np.arange(0.0, 6.0, 0.01)
    y1 = np.exp(-t) * np.cos(2*np.pi*t)
    y2 = np.exp(-t)
    y3 = -np.exp(-t)
    plt.plot(t, y2,'b--', t, y3,'b--',t, y1,'g', lw=2)
#------------------------------------------------------------------
# Processor 1
#------------------------------------------------------------------
if rank == 1:
    fig = plt.figure(figsize=(6,5))
    plt.plot(np.random.rand(100),np.random.rand(100), 'r')
    
#------------------------------------------------------------------
# Processor 2
#------------------------------------------------------------------    
if rank == 2:
    def lorenz(x, y, z, s=10, r=28, b=2.667):
        x_dot = s*(y - x)
        y_dot = r*x - y - x*z
        z_dot = x*y - b*z
        return x_dot, y_dot, z_dot

    dt = 0.01
    stepCnt = 10000

    # Need one more for the initial values
    xs = np.empty((stepCnt + 1,))
    ys = np.empty((stepCnt + 1,))
    zs = np.empty((stepCnt + 1,))

    # Setting initial values
    xs[0], ys[0], zs[0] = (0., 1., 1.05)

    # Stepping through "time".
    for i in range(stepCnt):
        # Derivatives of the X, Y, Z state
        x_dot, y_dot, z_dot = lorenz(xs[i], ys[i], zs[i])
        xs[i + 1] = xs[i] + (x_dot * dt)
        ys[i + 1] = ys[i] + (y_dot * dt)
        zs[i + 1] = zs[i] + (z_dot * dt)

    fig = plt.figure(figsize=(6,5))
    ax = fig.gca(projection='3d')

    ax.plot(xs, ys, zs, lw=0.2)
#------------------------------------------------------------------
# Processor 3
#------------------------------------------------------------------
if rank == 3:
    # Fixing random state for reproducibility
    np.random.seed(19680801)

    mu, sigma = 100, 15
    x = mu + sigma * np.random.randn(10000)

    # the histogram of the data
    fig = plt.figure(figsize=(6,5))
    n, bins, patches = plt.hist(x, 50, normed=1, facecolor='g', alpha=0.75)
    
    plt.xlabel('Smarts')
    plt.ylabel('Probability')
    plt.title('Histogram of IQ')
    plt.text(60, .025, r'$\mu=100,\ \sigma=15$')
    plt.axis([40, 160, 0, 0.03])
    plt.grid(True)
    
plt.show()    

### 3. Send, recv
<div style="text-align: justify">  

<p style="width:40%;float:right;padding-left:40px;padding-right:60px">
<img src=images/point_point.png>
<span style="font-size:smaller">
</span>
</p>

Send a message from one process to another one is known as point to point communication. In the message passing programming model it is done in a two step process, one task sends data while another receives data. The sending task must specify the data to be sent, their destination and an identifier for the message. The receiving task has to specify the location for storing the received data, their source and the identifier of the message to be received. 

</div>

Method for sending: 
```
comm.send(buf, dest=0, tag=0)

```
Method for receiving: 
```
comm.recv(obj=None, source=0, tag=0, status=None)
```

In [None]:
%%px
data = None
if rank == 0:
    data = (1, 'a', 'z', 3.14)
    comm.send(data, dest=1, tag=11)
    print('On rank', rank, ':  data = ', data)
elif rank == 1:
    print('On rank', rank, 'before recv:  data = ', data)
    data = comm.recv(source=0, tag=11)
    print('On rank', rank, 'after  recv:  data = ', data)

### 4. Broadcast
<div style="text-align: justify">  

<p style="width:40%;float:right;padding-left:40px;padding-right:60px">
<img src=images/Bcast.png>
<span style="font-size:smaller">
</span>
</p>

Multiple processes exchanging data with the same communicator is known as collective communication. Typical operations include Broadcast, Scatter, Gather, Reduction. We start by describing the syntax of Broadcast, it allows a user to send data that is generated by the master to all workers at once. Basically we broadcast data from master/root engine to all the other (workers) engines. The syntax of the Bcast() method is:   
</div>

```
comm.Bcast(buf, root=0)
buf = comm.bcast(obj=None, root=0)
```

In [None]:
%%px
print("-"*20)
print('On Rank:  ', rank)
print("-"*20)
# Prepare a vector of N=5 elements to be broadcasted...
N = 5
if rank == 0:
    A = np.arange(N, dtype=np.float64)    # rank 0 has proper data    
    print("Before broadcast A = %s" % A)
else:
    A = np.zeros(N, dtype=np.float64)     # all other just an empty array
    print("Before broadcast A = %s" % A)

# Broadcast A from rank 0 to everybody
comm.Bcast( [A, MPI.DOUBLE] )

# Everybody should now have the same...
for r in range(size):
    if rank == r:
        print("After  broadcast A = %s" % A)

### 5. Scatter
<div style="text-align: justify">  

<p style="width:40%;float:right;padding-left:40px;padding-right:60px">
<img src=images/Scatter.png>
<span style="font-size:smaller">
</span>
</p>
Scatter is the process of breaking up data and distributing each part to all processors. Whereas broadcast distributes the same object from the root engine to all others, scattering sends specific part of the data from root to all processors. The syntax of the scatter() method is  
</div>

```
comm.Scatter(sendbuf, recvbuf, root=0)
```

In [None]:
%%px
print("-"*20)
print('On Rank:  ', rank)
print("-"*20)

n = 2
A = np.zeros(n, dtype=np.float64)

if rank == 3:
    data = np.arange(n*size, dtype=np.float64) + 1
    print('Data to be scattered: data = %s' % data)
    print('Before scatter: A = %s' % A)
else:
    data = np.zeros(n*size, dtype=np.float64)
    print('Before scatter: A = %s' % A)

comm.Scatter( [data, MPI.DOUBLE], [A, MPI.DOUBLE] )

print("After  scatter: A = %s" % A)
print('') 

### 6. Gather
<div style="text-align: justify">  

<p style="width:40%;float:right;padding-left:40px;padding-right:60px">
<img src=images/Gather.png>
<span style="font-size:smaller">
</span>
</p>

Gather is the opposite of scatter. It is used to collect data from various cores and store it as one in a single core. The syntax of the Gather() methods is  
</div>

```
comm.Gather(sendbuf, recvbuf, root=0)
```

In [None]:
%%px
print("-"*20)
print('On Rank:  ', rank)
print("-"*20)

n = 2
A = np.zeros(n, dtype=np.float64)

data = np.arange(n*size, dtype=np.float64) + 1
comm.Scatter( [data, MPI.DOUBLE], [A, MPI.DOUBLE], root=0 )
data = np.zeros(n*size, dtype=np.float64) 

print("Before Gather: A = %s" % A)
comm.Gather( [A, MPI.DOUBLE], [data, MPI.DOUBLE], root=3 )
print("After  Gather: data = %s" % data)
print('') 

### 7. Allgather
<div style="text-align: justify">  

<p style="width:35%;float:right;padding-left:40px;padding-right:60px">
<img src=images/Allgather.png>
<span style="font-size:smaller">
</span>
</p>
The gather operations collects data from all tasks and delivers this collection to the root task. In contrast, allgather delivers this collection of data to all processors. The syntax of the Allgather() methods is 
</div>
```
comm.Allgather(sendbuf, recvbuf)
```

In [None]:
%%px
print("-"*20)
print('On Rank:  ', rank)
print("-"*20)

n = 2
A = np.zeros(n, dtype=np.float64)

data = np.arange(n*size, dtype=np.float64) + 1
comm.Scatter( [data, MPI.DOUBLE], [A, MPI.DOUBLE], root=0 )
data = np.zeros(n*size, dtype=np.float64) 

print("Before Allgather: A = %s" % A)
comm.Allgather( [A, MPI.DOUBLE], [data, MPI.DOUBLE] )
print("After  Allgather: data = %s" % data)
print('')   

### 8. Reduce
<div style="text-align: justify">  

<p style="width:40%;float:right;padding-left:40px;padding-right:60px">
<img src=images/Reduce.png>
<span style="font-size:smaller">
</span>
</p>
Data reduction involves reducing a set of numbers into a smaller set of numbers via a function. reduction methods of MPI combine data from all processors with an specific operation and delivers the result to an specific engine (root). mpi4py provides the following operations for reduction:  
</div>

```
MPI.MIN        minimum
MPI.MAX        maximum
MPI.SUM        sum
MPI.PROD       product
MPI.LAND       logical and
MPI.BAND       bitwise and
MPI.LOR        logical or
MPI.BOR        bitwise or
MPI.LXOR       logical xor
MPI.BXOR       bitwise xor
MPI.MAXLOC     max value and location
MPI.MINLOC     min value and location
```

The syntax of the Reduce() method is:
```
comm.Reduce(sendbuf, recvbuf, op=MPI.SUM, root=0)
```

In [None]:
%%px
# Array defined on each processor
A = np.zeros(size)
for i in range(comm.rank, len(A), comm.size):
    # set data in each array 
    A[i] = i + 1
print("On rank %d, A = %s" % (rank, A))

if rank == 3:
    Reduce = np.zeros(size) # only processor 0 will get the data
else:
    Reduce = None
    
comm.Reduce([A, MPI.DOUBLE], [Reduce, MPI.DOUBLE], op = MPI.SUM, root = 0)
if rank == 3: print("On rank %d, Reduce = %s" % (rank, Reduce))

### 9. Allreduce
<div style="text-align: justify">  

<p style="width:40%;float:right;padding-left:40px;padding-right:60px">
<img src=images/Allreduce.png>
<span style="font-size:smaller">
</span>
</p>
Allreduce operates in a similar way compared with Allgather, except that an operation on data is perform and delivered to all processors.
   
</div>

The syntax of the Allreduce() method is:
```
comm.Allreduce(sendbuf, recvbuf, op=MPI.SUM)
```

In [None]:
%%px 
A = np.zeros(size)
for i in range(comm.rank, len(A), comm.size):
    # set data in each array 
    A[i] = i + 1
print("On rank %d, A = %s" % (rank, A))

total = np.zeros(size)
    
comm.Allreduce([A, MPI.DOUBLE], [total, MPI.DOUBLE], op = MPI.SUM)

print("On rank %d, Allreduce = %s" % (rank, total))
print('')