# DS-GA-3001 Advanced Python for Data Science

Before you turn this problem in, make sure you **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart). You can then run the cells **in order**, during the class.

Any textual answers that need to be provided will be marked with "YOUR ANSWER HERE". Replace this text with your answer to the question.

Any code answers that need to be provided will be marked with:

```
# YOUR CODE HERE
raise NotImplementedError()
```

Replace all this code with your answer to the question. If you do not answer the question, the `NotImplementedError` exception will be raised, which will indicate to the grader that no answer has been supplied.

In many cases, code answers will also have some associated test code. You should execute the tests after you have entered your code in order to ensure that your answer is correct. You should not proceed to the next question until your answer is correct.

Finally, insert your Net ID and the Net ID's of any collaborators in the cell below.

In [1]:
NET_ID = "jl6583"
COLLABORATORS = ""

---

# Introduction to Parallel Programming with MPI (Part 2)

## Set up for MPI

It is assumed you have set up for MPI (either Docker or non-Docker) using the instructions from the first part. However, you will probably need to intialize your environment prior to starting this module.

### 1. Start the MPI cluster

Check that the **IPython Clusters** tab is visible. Switch to this tab and check and there is an **mpi** entry on the tab. Press the refresh button if not:

![](http://drive.google.com/uc?export=view&id=0B_3lImS7uRMgamVKb1cyU210S0k)

On the **mpi** entry, set the **# of engines** to **4**, then click the **Start** button. 

***Note that the # of engines must be less than or equal to the number of engines started.***

### 2. Initialize MPI

Run the following code to initialize MPI. Make sure there are no errors. 

*** Note: You may need to wait a few seconds after starting the engines before this code will work. Try running it again if you get an error.***

In [2]:
%pylab inline
from ipyparallel import Client, error
cluster = Client(profile="mpi")
view = cluster[:]
view.block=True
print len(cluster)

Populating the interactive namespace from numpy and matplotlib
4


## Non blocking communication

So far we have seen how to send and receive messages using blocking communication. In this case, the sender or receiver is not able to perform any other actions until the corresponding message has been sent or received (to be accurate, it is actually until the buffer is safe to use.)

Blocking communication has a number of disadvantages. Potential computational time is simply wasted while waiting for the call to complete. And as we have seen, blocking communication can also lead to deadlock.

An alternate approach is to allow the program to continue execution while the messages is being sent or received. This is known as non blocking communcation, and in MPI, is achieved using the `Isend` and `Irecv` methods. These methods return immediately, however the buffer is *not safe* for reuse.

The `Isend` and `Irecv` methods initiate a send and receive operation respectively. These methods return a instance of the `Request` class, which uniquely identifys the started operation. The completion can then be managed using the `Test`, `Wait`, and `Cancel` methods of the `Request` class. The management of `Request` objects and associated memory buffers involved in communication requires careful coordination. Users must ensure that objects exposing their memory buffers are not accessed at the Python level while they are involved in nonblocking message-passing operations.

The following example performs the same simple send and receive as demonstrated previously, however this time it is done with the non blocking versions of the send and receive methods.

<div class="alert alert-danger">
If you commented out the `Isend` and both `Wait` calls you would see that this code will not deadlock (if you don't comment out the `Wait` calls, then you have effectively the same code as before and it ***will*** deadlock.) However, unless you call `Cancel`, the Python kernel will eventually deadlock anyway as there will be an unequal number of messages posted, so I don't recommend doing it.
</div>

In [None]:
%%px
import numpy
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()

randNum = numpy.zeros(1)


if rank == 1:
        randNum = numpy.random.random_sample(1)
        print "Process", rank, "drew the number", randNum[0]
        req = comm.Isend(randNum, dest=0)
        req.Wait()

if rank == 0:
        print "Process", rank, "before receiving has the number", randNum[0]
        req = comm.Irecv(randNum, source=1)
        req.Wait()
        print "Process", rank, "received the number", randNum[0]

The following code is a non blocking version of the send and receive program. Note there is no need to wait after process 1 sends the message, nor after process 0 sends the reply. However it ***is*** necessary for process 1 to wait for the reply so that it knows the message has been fully received before trying to print it out. Similarly, process 0 must wait for the full message before trying to compute `randNum * 2`. Run it to verify the program works.

In [3]:
%%px
import numpy
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()

randNum = numpy.zeros(1)
diffNum = numpy.random.random_sample(1)

if rank == 1:
        randNum = numpy.random.random_sample(1)
        print "Process", rank, "drew the number", randNum[0]
        comm.Isend(randNum, dest=0)
        req = comm.Irecv(randNum, source=0)
        req.Wait()
        print "Process", rank, "received the number", randNum[0]

if rank == 0:
        print "Process", rank, "before receiving has the number", randNum[0]
        req = comm.Irecv(randNum, source=1)
        req.Wait()
        print "Process", rank, "received the number", randNum[0]
        randNum *= 2
        comm.Isend(randNum, dest=1)

[stdout:0] 
Process 1 drew the number 0.331456767838
Process 1 received the number 0.662913535677
[stdout:1] 
Process 0 before receiving has the number 0.0
Process 0 received the number 0.331456767838


<div class="alert alert-success">
Modify this program so that process 1 overlaps a computation with sending the message and receiving the reply. The computation should be dividing diffNum by 3.14 and printing the result.
</div>

In [5]:
%%px
import numpy
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()

randNum = numpy.zeros(1)
diffNum = numpy.random.random_sample(1)

if rank == 1:
        randNum = numpy.random.random_sample(1)
        diffNum = randNum[0]
        print "Process", rank, "drew the number", randNum[0]
        comm.Isend(randNum, dest=0)
        diffNum /= 3.14
        print 'Process', rank, 'has diffNum', diffNum
        req = comm.Irecv(randNum, source=0)
        req.Wait()
        print "Process", rank, "received the number", randNum[0]

if rank == 0:
        print "Process", rank, "before receiving has the number", randNum[0]
        req = comm.Irecv(randNum, source=1)
        req.Wait()
        print "Process", rank, "received the number", randNum[0]
        randNum *= 2
        comm.Isend(randNum, dest=1)

[stdout:0] 
Process 1 drew the number 0.934555320787
Process 1 has diffNum 0.297629083053
Process 1 received the number 1.86911064157
[stdout:1] 
Process 0 before receiving has the number 0.0
Process 0 received the number 0.934555320787


It is possible to test without waiting using `Request.Test()`. This method will return `True` when the message operation has completed. To cancel a pending communication, call `Request.Cancel()`.

## Problem decomposition

One of the first steps in designing a parallel program is to break the problem into discrete "chunks" of work that can be distributed to multiple tasks. This is known as decomposition or partitioning. There are two main ways to decompose an algorithm: *domain decomposition* and *functional decomposition*.

### Domain decomposition

In this type of partitioning, the data associated with a problem is decomposed. Each parallel task then works on a portion of the data.

* Data divided into pieces of same size and mapped to different processors
* Processor works only on data assigned to it
* Communicates with other processors when necessary
* Examples of domain (data) decomposition
  * Embarrassingly parallel applications (Monte Carlo simulations)
  * Finite difference calculations
  * Numerical integration

![](http://drive.google.com/uc?export=view&id=0B_3lImS7uRMgQ0FBWnFfVTRlbnM)

There are many different ways to partition the data.

![](http://drive.google.com/uc?export=view&id=0B_3lImS7uRMgRm04SnZ0WFlvbjA)

### Functional decomposition 

The focus is on the computation that is to be performed rather than on the data manipulated by the computation. The problem is decomposed according to the work that must be done. Each task then performs a portion of the overall work.

* Used when pieces of data require different processing times
* Performance limited by the slowest process
* Program decomposed into a number of small tasks
* Tasks assigned to processors as they become available
* Implemented in a master/slave paradigm
* Examples of functional decomposition
  * Surface reconstruction from a finite element mesh
  * Searching images or data bases

![](http://drive.google.com/uc?export=view&id=0B_3lImS7uRMgemhaYzJOVTVJZVk)

## Domain decomposition example

An example of domain decomposition can be seen by computing a simple integral using the Mid-point rule.

### Integrate cos(x) by Mid-point Rule

![](http://drive.google.com/uc?export=view&id=0B_3lImS7uRMgdl9SNnNyZ3dUWjg)

$\displaystyle\int_{a}^{b}cos(x)dx = \displaystyle\sum_{i=0}^{p-1}\displaystyle\sum_{j=0}^{n-1}\int_{a_i+j*h}^{a_i+(j+1)*h} cos(x)dx$

$\approx \displaystyle\sum_{i=0}^{p-1}\left[\displaystyle\sum_{j=0}^{n-1}cos(a_{ij})*h\right]$ where $\begin{cases}h=\dfrac{b-a}{pn}\\a_i=a+i*n*h\\a_{ij}=a_i+(j+0.5)*h\end{cases}$

$p$ is the number of partitions

$n$ is the number of increments per partition

### Serial version

An example of the serial code to implement this is:

In [6]:
from math import acos, cos

# Compute the inner sum
def integral(ai, h, n):
    integ = 0.0
    for j in range(n):
        aij = ai + (j + 0.5) * h
        integ += cos(aij) * h
    return integ

pi = 3.14159265359
p = 4
n = 500
a = 0.0
b = pi / 2.0
h = (b - a) / (n * p)

integral_sum = 0.0

# Compute the outer sum
for i in range(p):
    ai = a + i * n * h
    integral_sum += integral(ai, h, n)
    
print "The integral = ", integral_sum

The integral =  1.0000000257


### Parallel point-to-point version

Since the problem has been decomposed into separate partitions, it is easy to implement a parallel version of the algorithm. In this case, each of the partitions can be computed by a separate process. Once each process has computed its partition, it sends the result back to a root process (in this case process 0) which sums the values and prints the final result. 

In [7]:
%%px
import numpy
from math import acos, cos
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

def integral(ai, h, n):
    integ = 0.0
    for j in range(n):
        aij = ai + (j + 0.5) * h
        integ += cos(aij) * h
    return integ

pi = 3.14159265359
n = 500
a = 0.0
b = pi / 2.0
h = (b - a) / (n * size)
ai = a + rank * n * h

# All processes initialize my_int with their partition calculation
my_int = numpy.full(1, integral(ai, h, n))

print "Process ", rank, " has the partial integral ", my_int[0]

if rank == 0:
    # Process 0 receives all the partitions and computes the sum
    integral_sum = my_int[0]
    for p in range(1, size):
        comm.Recv(my_int, source=p)
        integral_sum += my_int[0]
    
    print "The integral = ", integral_sum
else:
    # All other processes just send their partition values to process 0
    comm.Send(my_int, dest=0)

[stdout:0] Process  1  has the partial integral  0.32442335716
[stdout:1] 
Process  0  has the partial integral  0.382683442201
The integral =  1.0000000257
[stdout:2] Process  2  has the partial integral  0.216772756896
[stdout:3] Process  3  has the partial integral  0.0761204694451


## Collective operations

There are many situations in parallel programming when groups of processes need to exchange messages. Rather than explicitly sending and receiving such messages, MPI provides group operations known as collectives.

Collective communications allow the sending of data between multiple processes of a group simultaneously. Collective functions come in blocking and non-blocking versions.

The more commonly used collective communication operations are the following:

* Synchronization
  * Processes wait until all members of the group have reached the synchronization point
* Global communication functions
  * Broadcast data from one member to all members of a group
  * Gather data from all members to one member of a group
  * Scatter data from one member to all members of a group
* Collective computation (reductions)
  * One member of the group collects data from the other members and performs an operation (min, max, add, multiply, etc.) on that data.

![](http://drive.google.com/uc?export=view&id=0B_3lImS7uRMgY1RuYTNtNkFMbW8)

***Collective communication routines must involve all processes within the scope of a communicator.***

All processes are by default, members in the communicator MPI.COMM_WORLD, however additional communicators can be defined by the programmer (beyond the scope of this course).

<div class="alert alert-danger">
Unexpected behavior, including program failure, can occur if even one task in the communicator doesn't participate.
It is the programmer's responsibility to ensure that all processes within a communicator participate in any collective operations.
</div>

### Example collective operations

#### `Comm.Barrier()`
Synchronization operation. Creates a barrier synchronization in a group. Each task, when reaching the `Barrier()` call, blocks until all tasks in the group reach a `Barrier()` call. Then all tasks are free to proceed.

#### `Comm.Bcast(buf, root=0)`
Data movement operation. Broadcasts (sends) a message from the process with rank "root" to all other processes in the group. 

#### `Comm.Scatter(sendbuf, recvbuf, root=0)`
Data movement operation. Distributes distinct messages from a single source task to each task in the group. 

#### `Comm.Gather(sendbuf, recvbuf, root=0)`
Data movement operation. Gathers distinct messages from each task in the group to a single destination task. This routine is the reverse operation of `Scatter()`. 

#### `Comm.Alltoall(sendbuf, recvbuf)`
All-to-all Scatter/Gather, send data from all to all processes in a group.

#### `Comm.Reduce(sendbuf, recvbuf, op=MPI.SUM, root=0)`
Reduces values on all processes to a single value by applying the operation `op`. Operations include:
* `MPI.MAX` - Returns the maximum element.
* `MPI.MIN` - Returns the minimum element.
* `MPI.SUM` - Sums the elements.
* `MPI.PROD` - Multiplies all elements.
* `MPI.LAND` - Performs a logical and across the elements.
* `MPI.LOR` - Performs a logical or across the elements.
* `MPI.BAND` - Performs a bitwise and across the bits of the elements.
* `MPI.BOR` - Performs a bitwise or across the bits of the elements.
* `MPI.MAXLOC` - Returns the maximum value and the rank of the process that owns it.
* `MPI.MINLOC` - Returns the minimum value and the rank of the process that owns it.

### Parallel collective version of Mid-point rule

The example below shows how the mid-point rule can be computed using collective operations. 

We choose to broadcast the number of increments per partition `n` to each process, although this is not strictly necessary. Once the processes have received `n` they are able to compute their partition. The processes then send the values back to the `root` process using `Reduce` which automatically computes the sum of all the values and places the result in `integral_sum`.

In [8]:
%%px
import numpy
from math import acos, cos
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

def integral(ai, h, n):
    integ = 0.0
    for j in range(n):
        aij = ai + (j + 0.5) * h
        integ += cos(aij) * h
    return integ

pi = 3.14159265359
a = 0.0
b = pi / 2.0
h = (b - a) / (n * size)
dest = 0
n = numpy.zeros(1)
my_int = numpy.zeros(1)
integral_sum = numpy.zeros(1)

# Initialize value of n only if this is rank 0
if rank == 0:
    n = numpy.full(1, 500) # default value
    
# Broadcast n to all processes
print "Process ", rank, " before n = ", n[0]
comm.Bcast(n, root=0)
print "Process ", rank, " after n = ", n[0]

# Compute partition
ai = a + rank * h * n
my_int[0] = integral(ai, h ,n)

# Send partition back to root process, computing sum across all partitions
print "Process ", rank, " has the partial integral ", my_int[0]
comm.Reduce(my_int, integral_sum, MPI.SUM, dest)

# Only print the result in process 0
if rank == 0:
    print 'The Integral Sum =', integral_sum[0]

[stdout:0] 
Process  1  before n =  0.0
Process  1  after n =  500.0
Process  1  has the partial integral  0.32442335716
[stdout:1] 
Process  0  before n =  500.0
Process  0  after n =  500.0
Process  0  has the partial integral  0.382683442201
The Integral Sum = 1.0000000257
[stdout:2] 
Process  2  before n =  0.0
Process  2  after n =  500.0
Process  2  has the partial integral  0.216772756896
[stdout:3] 
Process  3  before n =  0.0
Process  3  after n =  500.0
Process  3  has the partial integral  0.0761204694451


[stderr:1] 


<div class="alert alert-success">
Modify the above code to broadcast both the number of increments `n` *and* the increment width `h` to each process. Hint: `h` will need to be a NumPy array.
</div>

In [9]:
%%px
import numpy
from math import acos, cos
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

def integral(ai, h, n):
    integ = 0.0
    for j in range(n):
        aij = ai + (j + 0.5) * h
        integ += cos(aij) * h
    return integ

pi = 3.14159265359
a = 0.0
b = pi / 2.0
h = numpy.zeros(1)
dest = 0
n = numpy.zeros(1)
my_int = numpy.zeros(1)
integral_sum = numpy.zeros(1)

# Initialize value of n only if this is rank 0
if rank == 0:
    n = numpy.full(1, 500) # default value
    h = numpy.full(1,(b - a) / (n * size))
    
# Broadcast n to all processes
print "Process ", rank, " before n = ", n[0], " h = ", h[0]
comm.Bcast(n, root=0)
comm.Bcast(h, root=0)
print "Process ", rank, " after n = ", n[0], " h = ", h[0]

# Compute partition
ai = a + rank * h * n
my_int[0] = integral(ai, h ,n)

# Send partition back to root process, computing sum across all partitions
print "Process ", rank, " has the partial integral ", my_int[0]
comm.Reduce(my_int, integral_sum, MPI.SUM, dest)

# Only print the result in process 0
if rank == 0:
    print 'The Integral Sum =', integral_sum[0]

[stdout:0] 
Process  1  before n =  0.0  h =  0.0
Process  1  after n =  500.0  h =  0.000785398163398
Process  1  has the partial integral  0.32442335716
[stdout:1] 
Process  0  before n =  500.0  h =  0.000785398163398
Process  0  after n =  500.0  h =  0.000785398163398
Process  0  has the partial integral  0.382683442201
The Integral Sum = 1.0000000257
[stdout:2] 
Process  2  before n =  0.0  h =  0.0
Process  2  after n =  500.0  h =  0.000785398163398
Process  2  has the partial integral  0.216772756896
[stdout:3] 
Process  3  before n =  0.0  h =  0.0
Process  3  after n =  500.0  h =  0.000785398163398
Process  3  has the partial integral  0.0761204694451


## Final Notes

mpi4py supports two kinds of Python data objects. When using the upper case version of the methods (`Send`, `Irecv`, `Gather`, etc.) the data object must support the single-segment buffer interface. This interface is a standard Python mechanism provided by some types (e.g., strings and numeric arrays), which is why we have been using NumPy arrays in the examples. It is also possible to transmit an arbitrary Python data type using the lower case version of the methods (`send`, `irecv`, `gather`, etc.) mpi4py will serialize the data type, send it to the remote process, then deserialize it back to the original data type (a process known as *pickling* and *unpickling*). While this is simple, it also adds significant overhead to the MPI operaton.

There are many other MPI operations available that we have not touched on here. Please refer to the [mpi4py documentation](http://mpi4py.readthedocs.org/en/stable/index.html) for more information if you are interested.