# Going parallel with IPython: the basics

## Checking execution in a cluster

We are dealing here with clusters of IPython engines in your same computer. The easiest way to start a cluster is going back to the Notebook main interface, click on "Clusters" tab, type 4 in "# of engines" field, then press start.
If you execute the following code, you will see the ids of the four engines you have started up.

In [1]:
from IPython import parallel
rc = parallel.Client()
rc.ids

[0, 1, 2, 3]

Now for a simple execution on the cluster we are going to use the following function.

In [2]:
def longrun(n):
    from random import randint
    bignumber = 0
    for i in range(n):
       bignumber += randint(1,100)
    return bignumber

In [3]:
%timeit longrun(10000)

100 loops, best of 3: 11.6 ms per loop


The code has been executed in the default engine, we can see its process id with the following.

In [4]:
from os import getpid
print getpid()

1812


Now it is time to use the cluster for executing the function. We have to use apply_sync to execute it synchronously.

In [5]:
%timeit rc[3].apply_sync(longrun, 10000)

The slowest run took 13.85 times longer than the fastest. This could mean that an intermediate result is being cached 
1 loops, best of 3: 15.6 ms per loop


In [6]:
rc[0].apply_sync(getpid)

1804

You can do parallel execution simply by referring to a view, i.e. a list of engines.

In [7]:
rc[:].apply_sync(getpid)

[1804, 1805, 1806, 1807]

In [8]:
%timeit rc[:].apply_sync(longrun, 10000)

10 loops, best of 3: 32.5 ms per loop


In [9]:
print len(rc[1:3])
print type(rc[:])

2
<class 'IPython.parallel.client.view.DirectView'>


This is called the <b>direct view</b> that basically works as a multiplexer.

We can use the <a href="https://docs.python.org/2/library/functions.html#map">map built-in</a> to spread execution among engines. The following example calls longrun four times, one per each of the elements in the list passed.

In [10]:
# This is the regular map built-in in Python:
print map(longrun, [1000, 500, 1000, 200])
%timeit map(longrun, [1000, 500, 1000, 200])

[49164, 24479, 50544, 9895]
100 loops, best of 3: 3.2 ms per loop


Views have a map method that works similarly to the map built-in but does parallel execution.

In [11]:
view = rc.load_balanced_view()
view.map_sync(longrun, [1000, 500, 1000, 200])
%timeit view.map_sync(longrun, [1000, 500, 1000, 200])

100 loops, best of 3: 14.9 ms per loop


In [12]:
type(view)

IPython.parallel.client.view.LoadBalancedView

This is using the controller to load balance so leaving decisions to the underlying infrastructure.

Using dict syntax with views works as push/pull operations of variables in particular processes.

In [13]:
rc[1]["myVarIn1"] = "I am in 1"

In [14]:
print myVarIn1

NameError: name 'myVarIn1' is not defined

In [15]:
print rc[1]["myVarIn1"]
print rc[0]["myVarIn1"]

I am in 1


RemoteError: NameError(name 'myVarIn1' is not defined)

If we use parallel.Reference("x") in apply() functions, we are referring to a variable x that is already remote. This way, we can avoid copying the same variable through the network in several calls to apply. Think if x is a large array, this will save a lot of time.

## Partitioning data

Clusters start to become useful when you can partition data to do operations in pieces of data.

In [16]:
import numpy as np
%px import numpy as np # The magic is needed to execute it in every instance:
def CostlyOperation():
    global a  
    array = np.copy(a)
    return array*array

Scatter/gather take an array and partition it among the parallel instances available.

In [23]:
a = np.random.randn(200000)+1
print a, len(a)
other_view = rc[:]
print other_view

other_view.scatter('a', a)
# With other_view['a'] you can see the partitions
other_view['a']



[ 1.59055493  2.12341237  1.25481426 ...,  0.23470997  1.83227583
  0.52413171] 200000
<DirectView [0, 1, 2, 3]>


[array([ 1.59055493,  2.12341237,  1.25481426, ...,  0.05001647,
        -0.18774725, -0.95678693]),
 array([ 0.97191601,  2.40042363, -1.11058922, ...,  2.79939157,
        -0.31333175,  1.23305503]),
 array([ 1.43366825,  0.7725019 , -0.19259581, ...,  1.84885727,
         0.18619352,  2.23002061]),
 array([ 1.8583023 ,  0.59344644,  1.20996072, ...,  0.23470997,
         1.83227583,  0.52413171])]

In [25]:

# In this case, parallel execution is slower, as we are
# doing it in the same machine, but with more data transfer.
%timeit other_view.apply_sync(CostlyOperation)
%timeit CostlyOperation()

other_view.apply_sync(CostlyOperation)

100 loops, best of 3: 11.8 ms per loop
1000 loops, best of 3: 381 µs per loop


[array([  2.52986499e+00,   4.50888010e+00,   1.57455882e+00, ...,
          2.50164760e-03,   3.52490289e-02,   9.15441232e-01]),
 array([ 0.94462073,  5.76203359,  1.23340842, ...,  7.83659316,
         0.09817679,  1.5204247 ]),
 array([ 2.05540466,  0.59675919,  0.03709315, ...,  3.4182732 ,
         0.03466803,  4.97299191]),
 array([ 3.45328744,  0.35217867,  1.46400495, ...,  0.05508877,
         3.35723472,  0.27471405])]

Now you can recover values with gather:

In [26]:
other_view.block=True # Needed so that we wait for all the 

# We can get the original a that was distributed this way:
b = other_view.gather("a")
print b, len(b)

[ 1.59055493  2.12341237  1.25481426 ...,  0.23470997  1.83227583
  0.52413171] 200000
