# Several exercises: Jupyter Notebook, iPython and ipyparallel, and HPC MPICH

The content of this notebook is borrowed extensively from Daan Van Hauwermeiren, from his tutotial of ipyparallel, stored on github https://github.com/DaanVanHauwermeiren/ipyparallel-tutorial/blob/master/02-ipyparallel-tutorial-direct-interface.ipynb.  Many adaptions have been made to accommodate this demonstration and this HPC MPI environment.

Prior to running these steps in this notebook, the following details must
be completed outside the context of Jupyter, and generally will need to be 
facilitated by a systems administator with appropriate rights and knowledge 
of HPC and MPI.

  1) HPC and MPICH have been configured, with compute nodes running
  2) Related to HPC and MPICH, an NFS/NIS environment exists to facilitate the 'scientist' user environment across all of the computing resources in the cluster
  3) The ipyparallel client/engine environment must be configured and started that supports MPI/MPICH
  4) Ensure that the "IPython Cluster" called "mpi" in the JupyterHub is running

Once the above details have been acomplished, import the IPython ipyparallel module and create a Client instance

In [3]:
# import the IPython ipyparallel module and create a Client instance
# In this demonstration, an MPI-oriented client is created, referenced by the 'mpi' profile
# There are 4 mpi engines that have been configured and running on 4 separate HPC compute nodes
import ipyparallel as ipp
rc = ipp.Client(profile='mpi')

# Show that there are engines running, responding
rc.ids


[0, 1, 2, 3]

In [4]:
# Create an ipyparallel object, constructed via list-access to the client:
 vobject = rc[:]

Python’s builtin map() functions allows a function to be applied to a sequence element-by-element. This type of code is typically trivial to parallelize. In fact, since IPython’s interface is all about functions anyway, you can just use the builtin map() with a RemoteFunction, or a vobject’s map() method.

do an arbitrary serial computation using just the power of the HPC head node, 
... and show how long it takes to compute

In [8]:
%%time
serial_result = list(map(lambda x:x**2**2, range(30)))

CPU times: user 18 µs, sys: 3 µs, total: 21 µs
Wall time: 25.5 µs


Now do the same computation using the MPI compute nodes HPC cluster, 
... and show how long it takes

In [9]:
%%time
parallel_result = vobject.map_sync(lambda x:x**2**2, range(30))

CPU times: user 176 ms, sys: 53.1 ms, total: 229 ms
Wall time: 334 ms


In [10]:
serial_result==parallel_result

True

# Remote function decorators

Remote functions are just like normal functions, but when they are called, they execute on one or more engines, rather than locally.  Here we will demonstrate the @parallel function decorator, which creates parallel functions that break up an element-wise operations and distribute them to remote workers.  It also reconstructs the result from each worker as the result is returned.

In [11]:
# First, we'll enable blocking, which will be explored more throughly later.  
# In short, blocking will ensure that each task won't proceed until all the remotely distributed work is complete.
@vobject.remote(block=True)

# Define a function called "getpid" that ... well, you can see the description
def getpid():
    '''
    import library os and return the process number (pid) corresponding with
    the execution
    '''
    import os
    return os.getpid()

In [12]:
# Using our newly defined function, show the process id of the engine running on each compute node
getpid()

[17653, 5950, 5785, 5685]

We'll use numpy to create some complicated (random) arrays, then use those arrays for some big computations that should benefit by some distributed HPC compute resources.

In [13]:
import numpy as np
A = np.random.random((64,48))

In [14]:
# Create a little function that can do the calculations as a distribution among multiple, parallel compute nodes
@vobject.parallel(block=True)
def pmul(A,B):
    return A*B

We want to be able to compare the amount of time it takes to do the calulation locally on the HPC head
and the amount of time it takes to do the calculation among the distributed compute nodes

First, do the calculation locally, then do it remotely

In [15]:
%%time
C_local = A*A

CPU times: user 27 µs, sys: 6 µs, total: 33 µs
Wall time: 37.4 µs


In [16]:
%%time
C_remote = pmul(A,A)

CPU times: user 20.3 ms, sys: 51 µs, total: 20.3 ms
Wall time: 23.8 ms


In [17]:
(C_local == C_remote).all()

True

Create a simple, new function that can be called locally but that will execute remotely, in parallel.
It's just a simple instruction that will "echo" the output of what is run on the remote worker

In [18]:
@vobject.parallel(block=True)
def echo(x):
     return str(x)

In [19]:
echo(range(5))

['range(0, 2)', 'range(2, 3)', 'range(3, 4)', 'range(4, 5)']

In [20]:
echo.map(range(5))

['0', '1', '2', '3', '4']

# Blocking execution

In blocking mode, the iPython ipyparallel object (called vobject in these examples; defined at the beginning of this notebook) submits the command to the controller, which places the command in the engines’ queues for execution. The apply() call then blocks until the engines are done executing the command.

In [21]:
# Show function names (on the remote worker) that beging with the string "apply"
[x for x in dir(vobject) if x.startswith('apply')]

['apply', 'apply_async', 'apply_sync']

In [22]:
vobject.block = True
vobject['a'] = 5
vobject['b'] = 10
vobject.apply(lambda x: a+b+x, 27)

[42, 42, 42, 42]

In [23]:
vobject.block = False
vobject.apply_sync(lambda x: a+b+x, 27)

[42, 42, 42, 42]

Python commands can be executed as strings on specific engines by using a vobject’s execute method:

In [24]:
rc[::2].execute('c=a+b')

<AsyncResult: execute>

In [26]:
rc[1::2].execute('c=a-b')

<AsyncResult: execute>

In [27]:
vobject['c'] # shorthand for vobject.pull('c', block=True)

[15, -5, 15, -5]

# Non-blocking execution

In non-blocking mode, apply() submits the command to be executed and then returns a AsyncResult object immediately. The AsyncResult object gives you a way of getting a result at a later time through its get() method.

More info on the AsyncResult object: http://ipyparallel.readthedocs.io/en/6.0.2/asyncresult.html#parallel-asyncresult

This allows you to quickly submit long running commands without blocking your local Python/IPython session:

In [28]:
# define our function
def wait(t):
    import time
    tic = time.time()
    time.sleep(t)
    return time.time()-tic

In [29]:
# In non-blocking mode
ar = vobject.apply_async(wait, 3)

In [30]:
# Now block for the result, and the output won't disply until after 3 seconds
ar.get()

[3.0031118392944336, 3.003131151199341, 3.0031638145446777, 3.0031161308288574]

In [31]:
# Again in non-blocking mode, with longer wait (10 seconds)
ar = vobject.apply_async(wait, 10)

In [35]:
# Poll to see if the result is ready
# If you run this fast enough following the previous step, the output will be "False"
# But if you wait for 10 seconds, before executing this step, the output will be "True"
ar.ready()

True

In [36]:
# ask for the result, but wait a maximum of 1 second:
ar.get(1)

[10.010129928588867, 10.01012921333313, 10.01011323928833, 10.010184049606323]

Often, it is desirable to wait until a set of AsyncResult objects are done. For this, there is the method wait(). This method takes a tuple of AsyncResult objects (or msg_ids or indices to the client’s History), and blocks until all of the associated results are ready.

In proper Jupyter Notebook fashion, the step progress indicator will show as a '*' character until the instruction is completed.  Output will not be displayed until the instruction is completed.

In [38]:
vobject.block=False
# A trivial list of AsyncResults objects
pr_list = [vobject.apply_async(wait, 3) for i in range(10)]
# Wait until all of the clients have completed the instruction
vobject.wait(pr_list)
# Then, their results are ready using get() or the `.r` attribute
pr_list[0].get()

[3.0031027793884277, 3.0031118392944336, 3.0030837059020996, 3.003117322921753]

# Scatter and gather

Sometimes it is useful to partition a sequence and push the partitions to different engines. In MPI language, this is know as scatter/gather and we follow that terminology. However, it is important to remember that in IPython’s Client class, scatter() is from the interactive IPython session to the engines and gather() is from the engines back to the interactive IPython session. For scatter/gather operations between engines, MPI, pyzmq, or some other direct interconnect should be used.

In [39]:
vobject.scatter('a',range(16))

<AsyncResult: scatter>

In [40]:
vobject['a']

[range(0, 4), range(4, 8), range(8, 12), range(12, 16)]

In [42]:
vobject.gather('a')

<AsyncMapResult: gather>

# parallel list comprehensions

In many cases list comprehensions are nicer than using the map function. While we don’t have fully parallel list comprehensions, it is simple to get the basic effect using scatter() and gather():

The %px magic executes a single Python command on the engines specified by the targets attribute of the view instance

In [43]:
vobject.scatter('x', range(64))
#Parallel execution on engines: [0, 1, 2, 3]
%px y = [i**10 for i in x]
y = vobject.gather('y')
print(y.get()[-10:])

[210832519264920576, 253295162119140625, 303305489096114176, 362033331456891249, 430804206899405824, 511116753300641401, 604661760000000000, 713342911662882601, 839299365868340224, 984930291881790849]


# example: monte carlo approximation of pi

A simple toy problem to get a handle on multiple engines is a Monte Carlo approximation of π.

Let’s say we have a dartboard with a round target inscribed on a square board. If you threw darts randomly, and they land evenly distributed on the square board, how many darts would you expect to hit the target?

In [44]:
from random import random
from math import pi
vobject['random'] = random

In [45]:
def mcpi(nsamples):
    s = 0
    for i in range(nsamples):
        x = random()
        y = random()
        if x*x + y*y <= 1:
            s+=1
    return 4.*s/nsamples
    
def multi_mcpi(view, nsamples):
    p = len(view.targets)
    if nsamples % p:
        # ensure even divisibility
        nsamples += p - (nsamples%p)
    
    subsamples = nsamples//p
    
    ar = view.apply(mcpi, subsamples)
    return sum(ar)/p

def check_pi(tol=1e-5, step=10, verbose=False):
    guess = 0
    spi = pi
    steps = 0
    while abs(spi-guess)/spi > tol:
        for i in range(step):
            x = random()
            y = random()
            if x*x+y*y <= 1:
                guess += 4.
        steps += step
        spi = pi*steps
        if verbose:
            print(spi, guess, abs(spi-guess)/spi)
    return steps, guess/steps

In [50]:
%%time
mcpi(int(1e9))
# 1e7 means 10 to the 7th power.  "e" stands for "expontent"

CPU times: user 5min 37s, sys: 0 ns, total: 5min 37s
Wall time: 5min 38s


3.141515788

In [48]:
%%time
multi_mcpi(vobject, int(1e9))

CPU times: user 21.4 ms, sys: 0 ns, total: 21.4 ms
Wall time: 872 ms


3.1411488

In [49]:
check_pi()

(5000, 3.1416)