# Using `ipyparallel`

Parallel execution is tightly integrated with Jupyter in the `ipyparallel` package. Install with

```bash
pip install ipyparallel
```

This is a rich framework. For more on how to use `ipyparallel`, see

[Official documentation](https://ipyparallel.readthedocs.org/en/latest/)

In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt

## Starting engines

We will only use engines on local cores which does not require any setup - see [docs](https://ipyparallel.readthedocs.org/en/latest/process.html) for detailed instructions on how to set up a remote cluster, including setting up to use Amazon EC2 clusters.

You can start a cluster on the `IPython Clusters` tab in the main Jupyter browser window 

The main advantage of developing parallel applications using `ipyparallel` is that it can be done interactively within Jupyter.

## Basic concepts of `ipyparallel`

In [None]:
from ipyparallel import Client

The client connects to the cluster of "remote" engines that perfrom the actual computation. These engines may be on the same machine or on a cluster. 

In [None]:
rc = Client()

In [None]:
rc.ids

A view provides access to a subset of the engines available to the client. Jobs are submitted to the engines via the view. A direct view allows the user to explicitly send work specific engines. The load balanced view is like the `Pool` object in `multiprocessing`, and manages the scheduling and distribution of jobs for you.

**Direct view**

In [None]:
dv = rc[:]

Add 10 sets of 3 numbers in parallel using all engines.

In [None]:
dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))

Add 10 sets of 3 numbers in parallel using only alternate engines.

In [None]:
rc[::2].map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))

Add 10 sets of 3 numbers using a specific engine.

In [None]:
rc[2].map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))

**Load balanced view**

Use this when you have many jobs that take different amounts of time to complete.

In [None]:
lv = rc.load_balanced_view()

In [None]:
lv.map_sync(lambda x: sum(x), np.random.random((10, 100000)))

#### Calling functions with apply

In contrast to `map`, `apply` is just a simple function call run on all remote engines, and has the usual function signature `apply(f, *args, **kwargs)`. It is a primitive on which other more useful functions (such as `map`) are built upon.

In [None]:
rc[1:3].apply_sync(lambda x, y: x**2 + y**2, 3, 4)

In [None]:
rc[1:3].apply_sync(lambda x, y: x**2 + y**2, x=3, y=4)

### Synchronous and asynchronous jobs

We have used the `map_sync` and `apply_sync` methods. The `sync` suffix indicate that we want to run a synchronous job. Synchronous jobs `block` until all the computation is done and return the result.

In [None]:
res = dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))

In [None]:
res

In contrast, asynchronous jobs return immediately so that you can do other work, but returns a `AsyncMapResult` object, similar to the `future` object returned by the `concurrent.futures` package. You can query its status, cancel running jobs and retrieve results once they have been computed.

In [None]:
res = dv.map_async(lambda x, y, z: x + y + z, range(10), range(10), range(10))

In [None]:
res

In [None]:
res.done()

In [None]:
res.get()

There is also a `map` method that by default uses asynchronous mode, but you can change this by setting the `block` attribute or function argument.

In [None]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10))

In [None]:
res.get()

Change blocking mode for just one job.

In [None]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10), block=True)

In [None]:
res

Change blocking mode for this view so that all jobs are synchronous.

In [None]:
dv.block = True

In [None]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10))

In [None]:
res


### Remote function decorators 

The `@remote` decorator results in functions that will execute simultaneously on all engines in a view. For example, you can use this decorator if you always want to run $n$ independent parallel MCMC chains.

In [None]:
with dv.sync_imports():
    import numpy

In [None]:
@dv.remote(block = True)
def f1(n):
    return numpy.random.rand(n)

In [None]:
f1(4)

The @parallel decorator breaks up elementwise operations and distributes them.

In [None]:
@dv.parallel(block = True)
def f2(x):
    return x

In [None]:
f2(range(15))

In [None]:
@dv.parallel(block = True)
def f3(x):
    return sum(x)

In [None]:
f3(range(15))

In [None]:
@dv.parallel(block = True)
def f4(x, y):
    return x + y

In [None]:
f4(np.arange(10), np.arange(10))

####  Example: Use the `@parallel` decorator to speed up Mandelbrot calculations

In [None]:
def mandel1(x, y, max_iters=80):
    c = complex(x, y)
    z = 0.0j
    for i in range(max_iters):
        z = z*z + c
        if z.real*z.real + z.imag*z.imag >= 4:
            return i
    return max_iters

In [None]:
@dv.parallel(block = True)
def mandel2(x, y, max_iters=80):
    c = complex(x, y)
    z = 0.0j
    for i in range(max_iters):
        z = z*z + c
        if z.real*z.real + z.imag*z.imag >= 4:
            return i
    return max_iters

In [None]:
x = np.arange(-2, 1, 0.01)
y = np.arange(-1, 1, 0.01)
X, Y = np.meshgrid(x, y)

In [None]:
%%time
im1 = np.reshape(list(map(mandel1, X.ravel(), Y.ravel())), (len(y), len(x)))

In [None]:
%%time
im2 = np.reshape(mandel2.map(X.ravel(), Y.ravel()),  (len(y), len(x)))

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].grid(False)
axes[0].imshow(im1, cmap='jet')
axes[1].grid(False)
axes[1].imshow(im2, cmap='jet')
pass

### Functions with dependencies

Modules imported locally are NOT available in the remote engines.

In [None]:
import time
import datetime

In [None]:
def g1(x):
    time.sleep(0.1)
    now = datetime.datetime.now()
    return (now, x)

This fails with an Exception because the `time` and `datetime` modules are not imported in the remote engines.

```python
dv.map_sync(g1, range(10))
```

The simplest fix is to import the module(s) *within* the function

In [None]:
def g2(x):
    import time, datetime
    time.sleep(0.1)
    now = datetime.datetime.now()
    return (now, x)

In [None]:
dv.map_sync(g2, range(5))

Alternatively, you can simultaneously import both locally and in the remote engines with the `sync_import` context manager.

In [None]:
with dv.sync_imports():
    import time
    import datetime

Now the `g1` function will work.

In [None]:
dv.map_sync(g1, range(5))

Finally, there is also a `require` decorator that can be used. This will force the remote engine to import all packages given.

In [None]:
from ipyparallel import require

In [None]:
@require('scipy.stats')
def g3(x):
    return scipy.stats.norm(0,1).pdf(x)

In [None]:
dv.map(g3, np.arange(-3, 4))

### Moving data around

We can send data to remote engines with `push` and retrieve them with `pull`, or using the dictionary interface. For example, you can use this to distribute a large lookup table to all engines once instead of repeatedly as a function argument.

In [None]:
dv.push(dict(a=3, b=2))

In [None]:
def f(x):
    global a, b
    return a*x + b

In [None]:
dv.map_sync(f, range(5))

In [None]:
dv.pull(('a', 'b'))

#### You can also use the dictionary interface as an alternative to push and pull

In [None]:
dv['c'] = 5

In [None]:
dv['a']

In [None]:
dv['c']

Using parallel magic commands
----

In practice, most users will simply use the `%px` magic to execute code in parallel from within the notebook. This is the simplest way to use `ipyparallel`.

In [None]:
def f(xs):
    s = 0
    for x in xs:
        s += x
    return s

In [None]:
dv.map(f, np.random.random((6, 4)))

### %px

This sends the command to all targeted engines.

In [None]:
%px import numpy as np
%px a = np.random.random(4)
%px a.sum()

#### List comprehensions in parallel

The `scatter` method partitions and distributes data to all engines. The `gather` method does the reverse. Together with `%px`, we can simulate parallel list comprehensions.

In [None]:
dv.scatter('a', np.random.randint(0, 10, 10))
%px print(a)

In [None]:
dv.gather('a')

In [None]:
dv.scatter('xs', range(24))
%px y = [x**2 for x in xs]
np.array(dv.gather('y'))

#### Running magic functions in parallel

In [None]:
%%px --target [1,3]
%matplotlib inline
import seaborn as sns
x = np.random.normal(np.random.randint(-10, 10), 1, 100)
sns.kdeplot(x);

### Running in non-blocking mode

In [None]:
%%px --target [1,3] --noblock
%matplotlib inline
import seaborn as sns
x = np.random.normal(np.random.randint(-10, 10), 1, 100)
sns.kdeplot(x);

In [None]:
%pxresult