# Overview over using `ipyparallel`  in the Jupyterhub environment

## The `ipyparallel` module

- Need to start the cluster first 
- Done through the web interface
- You can use different profiles

In [1]:
from ipyparallel import Client

## Creating `DirectView` instances

### Using the janus-node profile

In [2]:
node_profile = 'janus-node'

#### Create a `Client` instance

In [3]:
from ipyparallel import Client
rcn = Client(profile=node_profile)
print(rcn.ids)
ndview = rcn[:]

[0, 1]


This uses the pre-defined profile 'janus-node'

### Using the janus-cpu profile

In [5]:
core_profile = 'janus-cpu'

In [6]:
rcc = Client(profile=core_profile)
print(rcc.ids)
cview = rcc[:]

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25]


This client uses the pre-defined profile 'janus-cpu'

## Where is my process running?


In [13]:
import socket
hostname = socket.gethostname()
print(hostname)

cnode0113.rc.int.colorado.edu


- The notebook is running on one of the nodes
- The cluster will be accessed through different mechanism

## Parallel magic commands

### Simple access to the python engines of the cluster

Using command `%px` or cellmagic `%%px`

In [7]:
%%px 
import socket
hostname = socket.gethostname()
print(hostname)

[stdout:0] node0356
[stdout:1] node0356
[stdout:2] node0356
[stdout:3] node0356
[stdout:4] node0356
[stdout:5] node0356
[stdout:6] node0356
[stdout:7] node0356
[stdout:8] node0356
[stdout:9] node0358
[stdout:10] node0358
[stdout:11] node0358
[stdout:12] node0358
[stdout:13] node0358
[stdout:14] node0358
[stdout:15] node0358
[stdout:16] node0358
[stdout:17] node0357
[stdout:18] node0357
[stdout:19] node0357
[stdout:20] node0357
[stdout:21] node0357
[stdout:22] node0357
[stdout:23] node0357
[stdout:24] node0357
[stdout:25] node0357


In [8]:
%%px --target ::2
hostname = socket.gethostname()
print(hostname)

[stdout:0] node0356
[stdout:2] node0356
[stdout:4] node0356
[stdout:6] node0356
[stdout:8] node0356
[stdout:10] node0358
[stdout:12] node0358
[stdout:14] node0358
[stdout:16] node0358
[stdout:18] node0357
[stdout:20] node0357
[stdout:22] node0357
[stdout:24] node0357


### Non-blocking

In [10]:
%%px --noblock
import time
time.sleep(1)
hostname = socket.gethostname()
print(hostname)

<AsyncResult: execute>

In [11]:
%pxresult

[stdout:0] node0356
[stdout:1] node0356
[stdout:2] node0356
[stdout:3] node0356
[stdout:4] node0356
[stdout:5] node0356
[stdout:6] node0356
[stdout:7] node0356
[stdout:8] node0356
[stdout:9] node0358
[stdout:10] node0358
[stdout:11] node0358
[stdout:12] node0358
[stdout:13] node0358
[stdout:14] node0358
[stdout:15] node0358
[stdout:16] node0358
[stdout:17] node0357
[stdout:18] node0357
[stdout:19] node0357
[stdout:20] node0357
[stdout:21] node0357
[stdout:22] node0357
[stdout:23] node0357
[stdout:24] node0357
[stdout:25] node0357


In [14]:
hostname

'cnode0113.rc.int.colorado.edu'

### Auto-parallel

In [15]:
%autopx

%autopx enabled


In [16]:
socket.gethostname()

[0;31mOut[0:5]: [0m'node0356'

[0;31mOut[1:4]: [0m'node0356'

[0;31mOut[2:5]: [0m'node0356'

[0;31mOut[3:4]: [0m'node0356'

[0;31mOut[4:5]: [0m'node0356'

[0;31mOut[5:4]: [0m'node0356'

[0;31mOut[6:5]: [0m'node0356'

[0;31mOut[7:4]: [0m'node0356'

[0;31mOut[8:5]: [0m'node0356'

[0;31mOut[9:4]: [0m'node0358'

[0;31mOut[10:5]: [0m'node0358'

[0;31mOut[11:4]: [0m'node0358'

[0;31mOut[12:5]: [0m'node0358'

[0;31mOut[13:4]: [0m'node0358'

[0;31mOut[14:5]: [0m'node0358'

[0;31mOut[15:4]: [0m'node0358'

[0;31mOut[16:5]: [0m'node0358'

[0;31mOut[17:4]: [0m'node0357'

[0;31mOut[18:5]: [0m'node0357'

[0;31mOut[19:4]: [0m'node0357'

[0;31mOut[20:5]: [0m'node0357'

[0;31mOut[21:4]: [0m'node0357'

[0;31mOut[22:5]: [0m'node0357'

[0;31mOut[23:4]: [0m'node0357'

[0;31mOut[24:5]: [0m'node0357'

[0;31mOut[25:4]: [0m'node0357'

In [17]:
%autopx

%autopx disabled


### %pxconfig

In [20]:
%pxconfig --targets 3::2
%px socket.gethostname()

[0;31mOut[3:5]: [0m'node0356'

[0;31mOut[5:5]: [0m'node0356'

[0;31mOut[7:5]: [0m'node0356'

[0;31mOut[9:5]: [0m'node0358'

[0;31mOut[11:5]: [0m'node0358'

[0;31mOut[13:5]: [0m'node0358'

[0;31mOut[15:5]: [0m'node0358'

[0;31mOut[17:5]: [0m'node0357'

[0;31mOut[19:5]: [0m'node0357'

[0;31mOut[21:5]: [0m'node0357'

[0;31mOut[23:5]: [0m'node0357'

[0;31mOut[25:5]: [0m'node0357'

##  Function Decorators
### `Remote` functions

- Like normal functions
- Execute on one or more engines

In [12]:
@dview.remote(block=True)
def gethostname():
    import socket
    return socket.gethostname()

gethostname()

['node0838',
 'node0838',
 'node0838',
 'node0838',
 'node0838',
 'node0838',
 'node0838',
 'node0838',
 'node0838',
 'node0838',
 'node0838',
 'node0838',
 'node0840',
 'node0840',
 'node0840',
 'node0840',
 'node0840',
 'node0840',
 'node0840',
 'node0840',
 'node0840',
 'node0840',
 'node0840',
 'node0840',
 'node0839',
 'node0839',
 'node0839',
 'node0839',
 'node0839',
 'node0839',
 'node0839',
 'node0839',
 'node0839',
 'node0839',
 'node0839',
 'node0839']

In [34]:
@cview.remote(block=True)
def p10(x):
    return x**10

serial_result=map(lambda x:x**10, range(32))
cview.map_sync(p10, range(32))

PicklingError: Can't pickle <function p10 at 0x2b8ac0a32e18>: it's not the same object as __main__.p10

### `Parallel` functions

#### Simpel element wise parallel computing

In [14]:
import numpy as np
A = np.random.random((128,128))

@dview.parallel(block=True)
def parallel_multiply(A,B):
    return A*B


Serial for comparision

In [15]:
C_serial = A*A

In [16]:
C_parallel = parallel_multiply(A,A)

In [17]:
(C_serial==C_parallel).all()

True

## Distributing and collecting data 
### Gather data from engines
#### Blocking

In [28]:
cview.block=True
cview.gather('hostname')

['node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357']

#### Non-blocking gather

In [29]:
cview.block=False
r = cview.gather('hostname')
cview.wait(r)
r.get()

['node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0356',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0358',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357',
 'node0357']

## Example: Monte-Carlo $\pi$

First a standard implementation. The following code runs on the notebook server

In [3]:
import random
 
def mc_pi(n):
    count = 0.0
    for i in range(n):
        x = random.random()
        y = random.random()
        if (x**2 + y**2) <= 1:
            count += 1.0
    return count/float(n)*4.


In [4]:
mc_pi(10**7)

3.1416124

### Map and reduce serially

This runs on the notebook server and not on any of the engines of the cluster

In [5]:
from functools import reduce
from operator import add

serial_result = list(map(mc_pi, [10**6, 10**6]))
print(serial_result)
reduce(add, serial_result) / (len(serial_result))

[3.145988, 3.14154]


3.143764

### Running on the engines

**Using a DirectView and then use a synchronous map**

Goal:
- Running 24 instances of the `mc_pi` function
- Get the results back in a list and then do a reduce on the list

In [38]:
%%px
import random

@dview.remote(block=True)
def mc_pi(n):
    count = 0.
    for i in range(n):
        x = random.random()
        y = random.random()
        if (x**2 + y**2) <= 1:
            count += 1.
    return count/n*4.

Let's run the mc_pi function on each of the engines

In [39]:
nengines = len(rcc.ids)
results = cview.map_sync(mc_pi, [10**6]*nengines)
print(results)

NameError: name 'mc_pi' is not defined

In [12]:
res_pi = sum(results)/len(results)
print(res_pi)

3.141517666666666


## Load balanced view

- Use when each function may take different times

In [13]:
lbview = rcc.load_balanced_view()
results = lbview.map_sync(mc_pi, [10**6]*50)
print(results)

[3.142056, 3.141544, 3.141184, 3.140612, 3.139964, 3.142568, 3.13822, 3.138336, 3.143684, 3.141888, 3.14274, 3.141716, 3.141136, 3.140384, 3.14318, 3.1437, 3.142548, 3.14154, 3.141528, 3.141852, 3.143092, 3.141592, 3.142836, 3.140804, 3.1412, 3.141544, 3.140208, 3.142688, 3.138908, 3.142596, 3.140712, 3.140672, 3.140216, 3.137944, 3.14374, 3.14302, 3.143228, 3.141328, 3.142232, 3.139264, 3.14118, 3.142352, 3.14116, 3.139972, 3.140468, 3.143428, 3.144848, 3.144036, 3.142412, 3.142376]


## DAGs for task dependencies

In [2]:
import networkx as nx

G = nx.DiGraph()

# add 5 nodes, labeled 0-4:
map(G.add_node, range(5))
# 1,2 depend on 0:
G.add_edge(0,1)
G.add_edge(0,2)
# 3 depends on 1,2
G.add_edge(1,3)
G.add_edge(2,3)
# 4 depends on 1
G.add_edge(1,4)

# now draw the graph:
pos = { 0 : (0,0), 1 : (1,1), 2 : (-1,1),
        3 : (0,2), 4 : (2,2)}
nx.draw(G, pos, edge_color='r')

## Simpel element wise parallel computing

In [14]:
import numpy as np
A = np.random.random((128,128))

@dview.parallel(block=True)
def parallel_multiply(A,B):
    return A*B


Serial

In [15]:
C_serial = A*A

In [16]:
C_parallel = parallel_multiply(A,A)

In [17]:
(C_serial==C_parallel).all()

True

In [1]:
import IPython
IPython.__version__

'4.2.0'