# Introduction to ipyparallel and deploying an IPython cluster using Mesos and Docker

The Analytics Services team here at Activision are heavy users of [Mesos](http://mesos.apache.org/) and [Marathon](https://github.com/mesosphere/marathon) to deploy and mangage services on our clusters. We are also huge fans of Python and the jupyter project. 

The [jupyter](http://jupyter.org/) project recently reogorganized from IPython, in a move refered to as "the split". One part that was orginally part of IPython (`IPython.parallel`) that was split off into a seperate project is [ipyparallel](https://github.com/ipython/ipyparallel). This powerful component of the IPython ecosystem is generally overlooked.

In this post I will give a quick introduction to the ipyparallel project and then introduce a new launcher we have open sourced to deploy IPython clusters to into Mesos clusters. While we have published this notebook in HTML, please feel free to download the [original](https://github.com/ActivisionGameScience/ActivisionGameScience.github.io/blob/master/_notebooks/IPython%20Parallel%20Introduction.ipynb) to follow along.

## Introduction to ipyparallel

The ipyparallel project is the new home of IPython.parallel module that was hosted within IPython core before 2015. The focus of the project is interactive cluster computing. I believe this focus on interactive computing and first class intergration with the IPython project is a distingishing feature. For a more complete dive into the internals of ipyparallel please visit the [docs](https://ipyparallel.readthedocs.org/en/latest/intro.html). I aim to give the bare minimum to get you started.

At the most basic level an IPython cluster is a set of Python intereters that can be accessed over tcp. Under the hood it works very similar to how Jupyter/IPython work today. When you open a new notebook in the browser a Python process (called a kernel) is started to actually run the code you submit. ipyparallel does the same thing except instead of a single Python kernel, you can start many distributed over many machines.

There are three main components to the stack. 
- Client: A Python process which submits work, Usually this is an IPython session or a Jupyter notebook. 
- Controller: The central coordinator. Accepts work from the client and passes it to engines, collects results and sends back to the client.
- Engine: Roughly equivalent to an IPython kernel. A Python interpreter that communicates with the controller to accept work and submit results.


## Starting your first cluster

The easiest way to get your hands dirty is to spin up a cluster locally. That is you will run a Client, Controller and Engines all on your local machine. The hardest part of provisioning distributed clusters is making sure all the pieces can talk to each other (as usual the easiest solution to a distributed problem is to make it local).


### Getting your enviornment started

Our team are big users of [conda](http://conda.pydata.org/) to help manage our dependecies (Python and beyond). Here is a quick run through to get setup. A combination of pip and virtualenv will also work, but when you start installing packages from the scipy stack we find conda the easist to use.

First find your version of Miniconda from [here](http://conda.pydata.org/miniconda.html)

If your using linux these commands will work:

```bash
wget https://repo.continuum.io/miniconda/Miniconda3-latest-MacOSX-x86_64.sh
bash Miniconda-latest-Linux-x86_64.sh # follow prompts
conda update --all
# make a new python 3 env named py3
conda create -n py3 python=3 ipython ipyparallel ipython-notebook
source activate py3
```

While there are lower level commands to start and configure Controllers and Engines. The primary command you will use is `ipcluster`. This is a helpful utility to start all the components and configure your local client. By default it uses the `LocalControllerLauncher` and the `LocalEngineSetLauncher` which is exactly what we want to start. 

Open a terminal install `ipyparallel` and start a cluster.

```bash
(py3)➜ ipcluster start --n=4
2016-04-11 22:24:15.514 [IPClusterStart] Starting ipcluster with [daemon=False]
2016-04-11 22:24:15.515 [IPClusterStart] Creating pid file: /home/vagrant/.ipython/profile_default/pid/ipcluster.pid
2016-04-11 22:24:15.515 [IPClusterStart] Starting Controller with LocalControllerLauncher
2016-04-11 22:24:16.519 [IPClusterStart] Starting 2 Engines with LocalEngineSetLauncher
2016-04-11 22:24:46.633 [IPClusterStart] Engines appear to have started successfully
```

In [1]:
# You can also use the IPython magic shell command. but errors are harder to see and stopping the cluster can be janky.
!ipcluster start -n 4 --daemon

If started correctly we should now have four engines running on our local machine. Now to actually interact with them. First we need to import the client.

In [2]:
import ipyparallel as ipp
rc = ipp.Client()

In [3]:
rc.ids # list the ids of the engine the client can communicate with

[0, 1, 2, 3]

The client has two primary way to farm out work to the engines. First is a direct view. This is used to apply the same work to all engines. To create a `DirectView` just slice the client.

The second way is a `LoadBalancedView` which we will cover later in the post.

In [4]:
dv = rc[:]
dv

<DirectView [0, 1, 2, 3]>

With a direct view you can issue a function to execute within the context of that engine's Python process.

In [5]:
def get_engine_pid():
    import os
    return os.getpid()
    
dv.apply_sync(get_engine_pid)

[31183, 31184, 31186, 31188]

This pattern is so common that ipyparallel provides a IPython magic function to execute a code cell to all engines: `%%px`

In [6]:
%%px
import os
os.getpid()

[0;31mOut[0:1]: [0m31183

[0;31mOut[1:1]: [0m31184

[0;31mOut[2:1]: [0m31186

[0;31mOut[3:1]: [0m31188

It is key to notice that the engines are fully running stateful Python interpreters. If you set a varible within `%%px` code block, it will remain there.

In [7]:
%%px
foo = 'bar on pid {}'.format(os.getpid())

In [8]:
%%px
foo

[0;31mOut[0:3]: [0m'bar on pid 31183'

[0;31mOut[1:3]: [0m'bar on pid 31184'

[0;31mOut[2:3]: [0m'bar on pid 31186'

[0;31mOut[3:3]: [0m'bar on pid 31188'

The `DirectView` object provides some syntaxic sugar to help distributing data to each engine. First is dictionary style retreival and assignment. First lets retieve the value of `foo` from each engine.

In [9]:
dv['foo']

['bar on pid 31183',
 'bar on pid 31184',
 'bar on pid 31186',
 'bar on pid 31188']

Now we can overwrite it's its value.

In [10]:
dv['foo'] = 'bar'
dv['foo']

['bar', 'bar', 'bar', 'bar']

There are many cases where you don't want the same data on each machine, but rather you want to chuck an list and distribute each chunk to an engine. The `DirectView` provides the `.scatter` and the `.gather` methods for this. 

In [11]:
# start with a list of ids to work on
user_ids = list(range(1000))
dv.scatter('user_id_chunk', user_ids)

<AsyncResult: scatter>

Notice that this method completed almost immediately and returned an `AsyncResult`. All the methods we have used up to now have be blocking and syncronous. The `scatter` method is aysnc. To turn this scatter into a blocking call we can chain a `.get()` to the call.

In [12]:
dv.scatter('user_id_chunk', user_ids).get()

[None, None, None, None]

Now we have a variable on each engine that holds an equal amount of the original list.

In [13]:
%%px
print("Len", len(user_id_chunk))
print("Max", max(user_id_chunk))

[stdout:0] 
Len 250
Max 249
[stdout:1] 
Len 250
Max 499
[stdout:2] 
Len 250
Max 749
[stdout:3] 
Len 250
Max 999


Lets apply a simple function to each list. First declare a function within each engine. The `--local` flag also executes the code block in your local client. This is very useful to help debug you code.

In [14]:
%%px --local
def the_most_interesting_transformation_ever(user_id):
    """
    This function is really interesting
    """
    return "ID:{}".format(user_id * 3)

In [15]:
the_most_interesting_transformation_ever(1)

'ID:3'

In [16]:
%%px
transformed_user_ids = list(map(the_most_interesting_transformation_ever, user_id_chunk))

Now we have 4 seperate list of transformed ids. We want to stich the disperate lists into one list on our local notebook. `gather` is used for that.

In [17]:
all_transformed_user_ids = dv.gather('transformed_user_ids').get()

In [18]:
print(len(all_transformed_user_ids))
print(all_transformed_user_ids[0:10])

1000
['ID:0', 'ID:3', 'ID:6', 'ID:9', 'ID:12', 'ID:15', 'ID:18', 'ID:21', 'ID:24', 'ID:27']


Obviously this example is contrived. The serialization cost of shipping Python objects over the wire to each engine is more expensive than the calculation we preformed. This tradeoff between serialization/transport vs computation cost is central to any decision to use distributed processing. However, there are many highly parallellizable problems where this project can be extremely useful. Some of the main usecases we use ipyparallel for are hyperparamater searches and bulk loading/writing from storage systems.

### LoadBalancedView

The previous example where you scatter a list, preform a calculation and then gather a result works for lots of problems. One issue that this approach is that each engine has does an identical ammount of work. If the complexity of the process each engine is preforming is variable, this naive scheduling approach can waste processing power and time. Take for example this function:

In [19]:
%%px --local
import random
import time
def fake_external_io(url):
    # Simulate variable complexity/latency
    time.sleep(random.random())
    return "HTML for URL: {}".format(url)

In [20]:
%time fake_external_io(1)

CPU times: user 1.01 ms, sys: 244 µs, total: 1.26 ms
Wall time: 434 ms


'HTML for URL: 1'

In [21]:
%time fake_external_io(1)

CPU times: user 2.24 ms, sys: 0 ns, total: 2.24 ms
Wall time: 835 ms


'HTML for URL: 1'

If you had a list of urls to scrape and gave each worker an equal share, some workers would finish early and have to sit around doing nothing. A better approach is to assign work to each engine as it finishes. This way the work will be load balanced over the cluster and you will complete your process earlier. ipyparallel provides the `LoadBalancedView` for this exact use case. For this specific problem threading or an async event loop would likely be a better approach to speeding up or scalling out, but suspend your disbelief for this exercise.

In [22]:
lview = rc.load_balanced_view()
lview

<LoadBalancedView None>

In [23]:
@lview.parallel()
@ipp.require('time', 'random')
def p_fake_external_io(url):
    # Simulate variable complexity/latency
    time.sleep(random.random())
    return "HTML for URL: {}".format(url)

Here we used two ipyparallel decorators. First we used `lview.parallel()` to declared this a parallel function. Second, we declared that this function depends on the modules time and random. Now that we have a load balanced function we can compare timings with our naive approach.

In [24]:
urls = ['foo{}.com'.format(i) for i in range(100)]

In [25]:
# Naive single threaded
%time res = list(map(fake_external_io, urls))

CPU times: user 28.2 ms, sys: 19 ms, total: 47.1 ms
Wall time: 46.2 s


In [26]:
dv.scatter('urls', urls).get()

[None, None, None, None]

In [27]:
# Naive aassignment
%time %px results = list(map(fake_external_io, urls))

CPU times: user 11.8 ms, sys: 10.1 ms, total: 21.9 ms
Wall time: 14.2 s


In [28]:
# Load balanced version
%time res = p_fake_external_io.map(urls).get()

CPU times: user 277 ms, sys: 53.8 ms, total: 330 ms
Wall time: 13 s


This isn't a perfect example but you can get the idea. The large the number of inputs to your parallel problem and the more variable the run time of each component process, the more time saving you can get from switching to a load balanced view.

This is only scratching the surface of ipyparallel project. I would highly recommend taking a look at the docs. Here is a list of further topics i would look into if you are interested.

- support for numpy memmap to allow engine located on a single node to share large arrays
- complex dependencies and more specialized scheduling 
- retry and recovery logic
- multiple clients working on the same cluster allowing remote collaborators to share an enviornment.


## Non-trival Usecases

Our team at Activision largely use ipython clusters for distibuted model training. This project has been vital for hyperparamater searches for our machine learning models. Allowing us to easily parallize these searches beyond one machine has speed up training by many orders of magnitude utilizing hundreds of cores.

## Ok this is cool but i want more cores!!!

The examples so far are a useful introduction to the api and the some features of ipyparallel. Hopefully you are convinced to try out the library. However, deploying a working cluster beyond a single machine introduces some issues. 

ipyparallel provides support for a range of cluster and batch job management systems such as PBS and WindowsHPC. The full list is provided in the documentation. ipyparallel also provides an SSH based launcher. Given passwordless ssh onto machine you can easily deploy engines and connect them to your controller and client. Also there is a wonderful project [starcluster](http://star.mit.edu/cluster/docs/latest/plugins/ipython.html) that helps spin up machines from cloud providors. 

These tools are great. If you have access to existing HPC clusters or are planning on deploying dedicated clusters either on your own cold-iron (2016 version of bare-metal) or in the cloud then they meet your needs.

However, we are big users of Mesos, Docker and Marathon to manage our clusters and services. Furthermore, even with the existing launchers managing complex dependencies within the engines is a pain. Using Docker to package all dependencies makes deploying heterogenous clusters easier. Targeting our existing cluster management system and simplifying dependencies is a big win for us.

With this in mind, we are open sourcing a new ipyparallel launcher that deploys IPython clusters into Mesos using Docker and Marathon. The code lives [here](https://github.com/ActivisionGameScience/ipyparallel-mesos/) and on pypi/conda as `ipyparallel_mesos`.

We have two prebuilt Docker images for the [Controller](https://hub.docker.com/r/jdennison/ipyparallel-marathon-controller/) and [Engine](https://hub.docker.com/r/jdennison/ipyparallel-marathon-engine/). These are striped down Docker images. Internally we use conda for almost all our depencies, even inside our Docker containers. Please visit our public [conda recipes](https://github.com/ActivisionGameScience/ags_conda_recipes) and [channel](https://anaconda.org/ActivisionGameScience). However, extending from the `ipyparallel-marathon-engine` image will allow you to easily install your custom dependencies with or with conda.

The project is young but hopefully you will find it useful. Please note that this currently targets Python 3. PR's are welcome to support older versions of Python (it's 2016, we can now refer to 2.7 as old). Please open any issues on the github page. Please read the README for the project for more details.




## More Resources

- [ipyparallel_mesos](https://github.com/Activision/ipyparallel-mesos/)
- [StarCluster](http://star.mit.edu/cluster/docs/latest/plugins/ipython.html)
- [Great IPython Parallel Course - Day 3](https://github.com/jupyter/ngcm-tutorial/tree/master/Day-3)