# <span id="chap_parallel"></span> Working at scale

So far so good: we've looked at a simple kind of complex network, and a simple kind of complex process. We've shown how we can analyse them analytically and simulate them, using the interactive capabilities of IPython to control computations, gather the results, produce graphs, and then make the whole lot available on the web.

If you've taken the opportunity to look at any of the research papers mentioned in the text so far, however, you may have noticed something that makes you slightly uncomfortable. Simulations need to be run several times for different parameter values to even-out the stochastic nature of the network/process interactions. Complex network effects often only work on large networks: in theory as $N \rightarrow \infty$, in practice with "lots" of nodes and edges. Both these factors combine to generate a *lot* of computation: $10^3$ repeated simulation runs across a space of two parameters over networks of $10^6$ nodes would not be especially unusual. Performing a simulation like this on a typical desktop workstation or laptop is clearly not going to work: even if the individual simulation runs only take a minute, ten thousand of them is a week of compute time, and if we do that for thirty different parameter values &ndash; well, you see the problem.

This has been a persistent narrative in computer science, of course, and it's usually tackled by appealing to [Moore's law](https://en.wikipedia.org/wiki/Moore%27s_law), the notion that the amount of computing power available at a given price doubles roughly every eighteen months. However, the programs we commonly write are sequential, and the Moore's law curve is flattening out for individual processor cores, reducing the speed-up available to sequential programs over time. Extra speed-up comes, not from faster processor cores, but from having more cores available, often on the same processor: but taking advantage of this means *parallel* processing with multiple "threads" of activity happening simultaneously.

To do research-grade simulation of complex networks and complex processes, then, requires that we tackle high-performance parallel computing. While the general techniques of parallel programming are quite arcane, there are specific techniques that are easy to use both conceptually and (to some extent) technically &ndash; and fortunately most of network science simulation fits into these special cases. 

In this chapter we explore the approaches we need to work with large networks. We start with the concepts of parallel systems as far as we need to understand them for our purposes. We'll then talk about using IPython's parallel tools, which turn out to be well-suited to the sorts of computations we want to perform, and which mean we can utilise high-performance computing from within the same interactive environment we've been using up to now. We'll look at two separate approaches to getting parallel processing: clusters (both local and remote), and cloud computing. In between we'll re-visit the issues of reproducibility that we explored [earlier](simulate.ipynb#sec_repeatability_small), since these are becoming increasingly important to conducting science at scale. 

## <span id="sec_thinking_parallel"></span> Thinking parallel

Parallel computing simply means that a *single* program does *several* things *simultaneously*. (That's the easy part over with: it gets harder from here.)

Writing programs is a process of writing algorithms, either from scratch or by mashing-up existing libraries to provide the functionality you want. Most algorithms &ndash; and most programs built from them &ndash; are designed to run *sequentially*. When designed around arrays, for example, they might start with the first element and process it, then move to the second, and so forth: one element at a time in a particular order. This is an easy, straightforward, and above all easily-comprehensible way to describe an algorithm, and it's taught to all programmers from when they first start programming. But it's not the only way to express computation.

This "reductionist" style of programming is sometimes referred to as the *von Neumann style*. [John von Neumann](https://en.wikipedia.org/wiki/John_von_Neumann) was another giant in the history of mathematics and computing, and his design for computers &ndash; the *von Neumann architecture* consisting a single central processing unit connected to memory, disc, and other peripherals &ndash; has influenced all the computers ever built. Although modern machines don't strictly follow von Neumann's design in at the hardware level, they typically take great pains to *behave* as though they do, and it's the mental model model programmers typically have of the way their computer is organised. The problem is that this forces algorithms to run sequentially because that'd how they're explicitly written. This limitation has been called the *von Neumann bottleneck* &ndash; although that's unfair to von Neumann, who intended his architecture as a reference model rather than as a design for real machines that would persist for sixty-odd years.

There is another style of programming, however, that focuses on describing how a program deals with data structures as a whole. Rather than write loops that traverse arrays one element at a time, for example, this style provides functions that (for example) apply the *same* operation to *all* elements of the array in one go, or to reduce all the elements through repeated application of some binary operator. The programmer writes her program to manipulate the entire array in one go using these "bulk" operators. Internally the bulk operations might be written as loops, as in the von Neumann style, but &ndash; critically &ndash; they *might* be written to work in parallel. Algorithms written in this style therefore aren't inherently sequential (although they might be realised that way): they have the *opportunity* for parallelism.

(This style is often associated with functional programming, but that's misleading. Functional programming is concerned with lack of side effects, amongst other concerns, which do make the "bulk" style easier to work with, but aren't necessary for it. It's perfectly possibly apply bulk operations in traditional or object-oriented languages with side effects. There are plenty of reasons to adopt functional languages and a functional style, but parallelism isn't really a very good one: you can get the same benefits in a sufficiently rich imperative language, and keep access to a better range of libraries.)

The point about bulk operators is that they identify things that can happen simultaneously without interacting with each other, like applying a function independently to each element of an array. There will be other parts of a program that have to happen in a particular order. In order to apply our function in parallel to an array, for example, we first have to create the array, and afterwards we might have to print out a result. We can't apply the function until we've created the array; we can't print the result until we've done the calculation. So we have a sequential program (create array, compute, print results) that embeds a parallel compute state within which we do as much as we can in parallel. If we have a multicore machine, the parallel compute stage might make use of all the cores in the machine, with each core performing an independent part of the computation. If we have 16 cores we can do 16 parts together and, all things being equal, the computation will only take 1/16th as long as it would take as a sequential algorithm running on a single core. The more of the program we can push into bulk operations executed as a parallel stage, the more opportunities we have for parallelism and therefore the more opportunities for speeding-up the program overall. This approach is a form of *explicit* parallelism, in the sense that we'll be identifying those computations we want to happen in parallel and coding them slightly differently to the way we code the sequential parts of programs.

This is the way we'll think about parallel programming in this book: as essentially sequential programs that contain computational stages consisting of applying the *same* basic computation to *multiple* piece of data in parallel on as many cores as we have available. In most cases the computations that interest us will be network simulations, and we'll get parallelism by performing the *multiple runs* of the *same* simulation code across *different* networks. Each network can be simulated independently; potentially each repetiton could be simulated simultaneously; and we'll look at how to get as much speed-up as we can from these parallel opportunities.

## <span id="sec_ipython_parallel"></span> Native IPython parallelism

IPython has some useful parallel programming constructs built-in. They're so useful and so integrated into the notebook interface that they make a perfect place to start &ndash; and indeed to stop, for the many applications.

### The IPython parallel computing architecture

Modern machines are typically multicore, where the single central processor is composed of several idependent "cores" that can run independent programs. Getting all these cores working simultansously is the key to getting parallel speed-up. You can keep cores busy by running several different programs together, or by having parallel sections to programs that can each take a core. It's important to remember that you, as a user or as a progammer, never have to worry about assigning programs to cores. The operating system does that automatically. Your job, in order to get parallel speed-up, is to make jobs available to be scheduled onto the different cores. 

IPython comes with a parallel computing library built-in. This library is actually very general and provides for several different parallel processing "idioms", but there's one that's especially useful for network science simulations so we'll focus exclusively on that. If you ever need something more flexible, or want to understand parallel computing more generally, there are plenty of material available: a good on-line tutorial is [[GPRK11]](bibliography.ipynb#GPRK11), while [[Ros13]](bibliography.ipynb#Ros13) also covers the material in some depth.

When we talk about a "computing architecture" we mean the components that go together to form the computing solution: the "mental map" a programmer needs in order to use the system. An architecture consists of both *physical* and *logical* elements. The physical elements are a set of machines connected by one of more interconnects: I'll use the term "interconnect" to refer to communication links between machines, instead of the more usual term "network", for reasons that are hopefully obvious! Typically we have a number of machines being used for computing, and we'll refer to these machines as a *cluster*. Each machine in the cluster will have one or more cores that can be used for computing. Within the cluster there is a single machine that manages the compute services being provided, and we'll refer to this machine as the *cluster head*. The other machines are dedicated to providing compute services, and we refer to these as *workers*. Outside the cluster there will be one or more *client* machines being used by scientists, which may be deaktop or laptop machines, and may be powerful in their own right.

The logical elements are a set of processes running on the physical elements. Each client will have one or more IPython processes to manage the interface with the users, and we'll refer to these processes generically as *notebooks* even though they might actually be just scripts. On the cluster head there is a single *controller* that acts as the gateway to the cluster's compute resources. On the cluster workers there are a number of *engines* that connect to the controller. How many engines? Typically one engine per core, although in some circumstances (such as when you're sharing your cluster with ither users) you might start fewer. (There's never any reason to start more engines on a machine than it has cores.)

Each process in the logical architecture is its own IPython process, and that's important. The data structures and functions you create in your notebook aren't automatically available to the engines. You have to import or communicate code to the engines in order for them to run it; you have to store data in files, or pass it over the interconnect, in order for them to have something to process. These difference can be fiddly, and we'll deal with them in some depth below.

To recap, the notebook views the cluster through the controller on the cluster head, which is connected to engines running on cluster workers, with there usually being as many engines available as the cluster workers have cores. The idea is that work generated in the notebook will be executed by the engines, and there are enough engines to soak-up all the computing resources available in the cluster. 

<div class=figure id=fig_ipython_architecture>
<div class=figurebody>
<img alt="IPython parallel computing architecture" src="ipython-parallelism.svg">
<br>
<span class=caption>The IPython parallel architecture. Processes map to different machines. Multicore workers will have several engines</span>
</div>
</div>

This is the most general layout for IPython parallel computing. There are some important variants, the most important of which are the *local cluster* that we'll describe next, and the *remote clusters* that we return to [below](#sec_remote_cluster).

### <span id="sec_local_cluster"></span> Local clusters

Suppose you have a single laptop or workstation. Modern machines are increasingly powerful, and even personal machines are becoming multicore, so you can get respectable, if limited, parallel processing even when you don't have access to a cluster. If you run programs just from a single notebook or script, you don't get any speed-up from having a multicore machine: to get it, you have to set up a cluster just for your own machine. In this case the components of the cluster reside *all on the same machine* at the physical level, but still have the same logical architecture.

<div class=figure id=fig_ipython_local_cluster>
<div class=figurebody>
<img alt="IPython parallelism on a single machine" src="ipython-local-parallelism.svg">
<br>
<span class=caption>A local cluster. The logical architecture, and therefore the view from software, remains the same as for the general case.</span>
</div>
</div>

The important point to remember is that, from a software perspective, a local cluster is identical to a larger, more powerful one. You can therefore develop code locally and then port it to a larger machine with reasonable confidence that you've ironed out the inevitable bugs.

As we observed above, a local cluster is where the entire logical architecture runs on a single machine. The simplest such cluster has a single controller and a single engine, and can be started with a single command:

In [7]:
%%bash --bg

ipcluster start

Starting job # 3 in a separate thread.


This will start a controller and one engine, and link them together ready for use. If your workstation has multiple cores, you can start multiple engines to keep them all busy. On a quad-core machine you might run:

In [None]:
%%bash --bg

ipcluster start --n 4

which starts a controller and four engines. That's it! The cluster is now ready for use. When you want to stop it, just use: 

In [5]:
%%bash --bg

ipcluster stop

Starting job # 2 in a separate thread.


### <span id="sec_parallel_ipython_programming"></span> How to write (and run) programs in parallel

Let's write our first parallel program.

As we observed above, IPython's computing model is explicitly parallel, meaning we identify where we want parallelism to happen by using special code. The simplest way to use IPython for parallelism is to make use of the "parallal `map`". The `map` funcion in Python takes a function and applies it to each element of a list, returning a new list consisting of the values that function returns when applies to each element in turn. This works for any function that takes a single argument. For example we can define a factorial function:

In [20]:
def factorial( n ):
    '''Return the factorial of the number given.
    
    n: the number
    returns: the factorial of that number'''
    
    f = 1
    for i in range(1, n + 1):
        f = f * i
    return f

We can then create a list of numbers and apply `factorial` to each element:

In [28]:
ns = [ 2, 6, 10, 20, 30, 40 ]
map(factorial, ns)

[2,
 720,
 3628800,
 2432902008176640000,
 265252859812191058636308480000000L,
 815915283247897734345611269596115894272000000000L]

`map()` is an example of what's called a *higher-order function*: it takes another function (`factorial()` in this case) as argument and makes use of it (applying it in turn to all the elements of its other argument). A useful way to think of this is that `map()` defines a computational pattern &ndash; "apply a function to each element of a list &ndash; that is then filled-in with the desired function and list. The pattern of computation that `map()` gives rise to is "the same" in all cases, although the details depend on the function being mapped.

The built-in `map()` happens to apply the function sequentially and in order to the list elements, but that's just a detail: we could define a "parallel map" that applies the function to each element of the list in parallel, and (if these applications happened on different cores) the result would come back faster. This is the essence of the simplest form of parallelism supported by IPython, and this is the style we'll use almost exclusively in this book.

We first need to load the appropriate IPython support library: 

In [2]:
from IPython.parallel import Client

The `Client` object represents the connection between the notebook and the cluster controller. If we've started a local cluster as above, we can connect to it very simply:

In [8]:
cluster = Client()

The `Client` object has a number of methods and other useful features available. We can check how manay engines are available, for example:

In [9]:
print "Local cluster has {n} engines".format(n = len(cluster[:]))

Local cluster has 4 engines


To work with these engines, we need two things. Firstly, as mentioned above, each engine runs in its own IPython process: it doesn't automatically have access to the code and data running in the notebook. So we need to ensure that we import everything we need into the engines' IPython instances. Our current example is so simple that we don't actually need anything, but suppose we needed `numpy`. We could import it into all the engines in the cluster with one command: 

In [29]:
with cluster[:].sync_imports():
    import numpy

importing numpy on engine(s)


Secondly, we need a "view" onto the cluster. Views in IPython are objects that control how jobs are allocated to engines. There are several different kinds, but for our purposes we just need the *load-balanced* view that allocates a job to an engine whenever it is free:

In [30]:
view = cluster.load_balanced_view()

The view contains operations for sending work to the cluster. In particular, it includes a "parallel map" function that applies a function to elements of a list in parallel. What does this mean? The map function will send each engine an element of the array and the function to apply to it. It will collect all the results as they come back and assemble them into a result list, just like the built-in map function. If there are more elements in the list than there are engines in the cluster (which would normally be the case), it will get all the engines working and, when an engine finishes computing for one element, it will send it another. This is what load balancing means: the view creates a set of jobs for the paralell map &ndash; one per list element &ndash; and sends them to the first free engine. The parallel map finishes when all the individual jobs have finished, and the jobs run in parallel to the extent that there are free engines available.

Putting this together we can perform our factorial map in parallel:

In [34]:
view.map_sync(factorial, ns)

[2,
 720,
 3628800,
 2432902008176640000,
 265252859812191058636308480000000L,
 815915283247897734345611269596115894272000000000L]

This is identical to the result above, which is the point: built-in `map` and `map_sync` on the cluster view are essentially identical from the programmer's perspective, with the latter being faster if there's a lot of work to be done. We could apply a much more complicated function in exactly the same way and have it farmed-out to parallel engines. This is explicitly parallel in the sense that we've instructed IPython to use parallelism on the cluster: *we* made the decision, not IPython. However, IPython's parallelism library takes care of all the mechanics of dividing-up the map into a collection of paralell jobs and sending them to the cluster.    

### <span id="sec_ipython_parallel_mechanics"></span> How IPython clusters work

It's worth understanding how the cluster is going to work under the bonnet, at least at a superficial level.

We've mentioned a couple of times that each engine (and the notebook, and the controller) are running in separate IPython processes, which implies that all code and data either have to be imported into the interpreter or passed to them somehow. We saw the former process happening above with `sync_imports()`; the latter is hidden inside the `map_sync()` call.

<div class=figure id=fig_ipython_mechanics>
<div class=figurebody>
<img alt="IPython message exchange" src="ipython-mechanics.svg">
<br>
<span class=caption>The mechanics of IPython parallelism.</span>
</div>
</div>

When we call `map_sync()`, IPython generates a number of jobs, one per list element. The jobs consist of applying the function baing mapped to one list element, and each one is independent of the others and can be executed in parallel. Each job is given a *message id* that will identify it within the system, and is sent to the cluster controller. The controller queues-up the jobs and sends them, one at a time, to a free engine. As engines finish their jobs, the controller saves the results against the corresponding message id. Once all the message ids have results &ndash; that is, all the component jobs have been executed &ndash; the controller returns these results to the notebook. The notebook assembles the results into a result list in the right order and returns them to the user program.

The same cluster might be being used by several notebooks, so the controller might be dealing with several independent streams of jobs. Engines might be of different speeds, and so some jobs might complete more quickly than others. Or the jobs themselves might be of different sizes, some much easier than others. *None of that matters.* The view and controller will take care of the mechanics. As far as the programmer is concerned, calling `map_sync()` causes some computation to occur exactly as would happen when calling `map()`, but hopefully faster.

### Consequences

The IPython architecture is simple and straightforward, but does have some important characteristics that affect how we write software that's to run in parallel. In this section we list a few "rules" for more effective IPython parallel programming. These are "rules", not rules, because they're no sense in which breaking them is bad: in particular circumstances, they;re exactly the *wrong* way to go. However, they've proven to be good in many circumstances and so are worth at least breaking deliberately rather than by accident. 

**Test a single simulation first.** This may seem obvious, but make sure a simulation works *once* before trying to run it in parallel. Few things are more frustrating than having a whole load of parallel jobs fail because of a simple fault that testing would have picked up.

**Some things don't travel.** This is actually quite a subtle issue. The IPython engines are separate processes from the client. If we run a function in an engine, then the function and all its data have to be passed from the client, over the network, to the engine &ndash; even if both are on the same machine. This is accomplished by pickling the data in the same way as you'd pickle data to store it on disc.

Not all types pickle, however: notably, the standard pickler can't pickle function closures. We can get around this by using a better pickler like `dill`, but there are still some types that can't be transported. Often they're buried deep in libraries: for example, you can't send a `scipy` random variable from a notebook to an engine. This means that idioms you might use on a single machine can be problematic in a parallel scenario. You can minimse the problems caused by distribution by sticking with simple data types that `dill` (at least) can handle: classes, function closures, basic types, lists and dictionaries of these types, most elements of `numpy`.

This handles the mechanics of transporting data &ndash; don't try to transport something complicated &ndash; but doesn't solve the problem of letting us actually use all the types we have available. For this we need the following "rule" too:

**Separate the simulation logic from the setup logic.** When writing a simulation, write the actual network implementation to be as clean as possible. That's the simulation logic. When it comes to setting-up and and running the simulations, have another function that sets-up the simulations, possibly on an engine. This is the setup logic, and it is the place to deal with the nastiness caused by distribution.

Why does this make sense? The simualtion logic is the complicated, scientific bit that it's important to be able to understand, maintain, and trust. The setup logic is just engineering, to make a simulation run in a particular envionment. By localising the details needed to deal with distribution in the setup logic we can keep the simulation clean and simple.That's much better design than having lots of parallelism-specific code strung through simulations.

We'll see examples of this design approach below.

[PUT THINGS INTO CLOSURES TO BE CALLED SERVER-SIDE]

## <span id="sec_remote_cluster"></span> Setting up a remote cluster

Running on a local workstation soon become suntenable when trying to perform research-grade simulations: the sizes of the networks and the number of repetitions needed simply overwhelms a single machine, and even if you upgrade you'll soon be wanting more power. There are two solutions:

1. Use a computing cluster
1. Use a cloud computing provider

In this section we'll deal with running on a cluster, and [then](#sec_cloud_computing) look at running in the cloud.

### Cluster computing

Until very recently, cluster computing was the most common form of high-performance computing. While they're rapidly being superceded by cloud services, clusters remain a useful way to get more performance cheaply from local resources.

Clusters come in two flavours: *dedicated* clusters built for the purpose, and *workstation* clusters that make use of "spare" computing power available from machines on people's desk or labs when they're not using them. Clusters come in a bewildering range of flavours. The most common use Linux, with [Rocks](http://www.rocksclusters.org) being very popular. The concepts are all similar, though, whatever flavour you have available.

[REMOTE CLUSTER ARRANGEMENTS: SHARED FILES, DEDICATED VS NoW]

### Planning a remote cluster

The nice thing about using a system like IPython is that is isolates you, as far as possible, from the underlying mechaics of cluster-building. (This won't be true later when we talk about [cloud computing](#sec_cloud_computing), where you typically end up committed to a particular service provider very early on. This is yet another attraction of using clusters.) From IPython's perspective, you simply attach to a cluster controller (an instance of `ipcontroller` running somewhere) and then fire compute jobs at it. What could possibly go wrong?

Quite a lot, as it turns out. There are essentially three problems we need to deal with.

Firstly, we have to set up the controller and engines so that they all talk to each other within the cluster. "Talking" in this case means both the the controller and the engines can route packets between each other using a network, and that they have permission to interact: a network issue and a security issue, in other words. There are lots of ways a cluster's network can be set up, but a common configuration has the engines and controller on the same local-area network (LAN) and able to communicate directly. The client may be on the same LAN, or may be on another that's accessibly, or the controller may be multihomed and sit on both the client's and the engine's networks simultaneously using two different network interfaces.

Secondly, we have to get software and data to the engines. In dedicated clusters the engines (and the controller) often share a common file system mounted *via* NFS or similar. In networks of workstations, the machines will often all have independent file systems and we'll have to handle synchronising software and data to them.

Thirdly, we have to let the IPython client talk to the cluster controller. This may be equally easy, but is more often more difficult, since client and cluster may live on different networks. Even if they do happen to be directly visible to each other, though, you might decide to go on the road outside your LAN &ndash; to work from home, for example &ndash; and have to access the cluster across a firewall. Since this is a more common case, we'll assume that this is the situation you're facing: a cluster that's local to itself, but one the other side of a firewall from your client machine at least some of the time. In most cases this means accessing the cluster over an `ssh` tunnel.

### `ssh` tunnels

If you're familiar with `ssh`, it will be sufficient to tell you to generate a keypair, install them on the cluster machines, and use `ssh-agent` when setting up the cluster &ndash; and you can now skip the rest of this section.

If, on the other hand, that made no sense to you, read on.

[EXPLAIN ssh KEYPAIRS, TUNNELS, ssh-agent]

### Configuring a remote cluster

Getting IPython to use a remote cluster is basically just like getting it to use a local one, with the sole difference that we need to get the cluster machines talking, tell the client where the cluster controller is, and make sure everything is accessible. Fortunately this is all fairly easy. It's easiest if we start at the cluster side and then progress to the client.

All the things we need to set are set from an IPython profile, so we'll do that first. On the cluster head, we'll first create an IPython profile:

<tt>
ipython profile create <i>cluster</i> --parallel
</tt>

where *cluster* is the name of the profile, which seems fairly sensible in general but you might want to name your profile after the name of the cluster it describes. (In St Andrews I use the *blob* cluster, so my profile is called *blob*.) As before, this creates all the configuration files we need. We do, however, need to edit things slightly.

We need to edit the file `.ipython/profile_cluster/ipcontroller_config.py` (assuming our profile was called *cluster*). Opening this file in an editor will reveal a load of commented-out Python code that sets various parameters that control the behaviour of the controller. The vast majority of these can be left as they are for the vast majority of installations, but we need to make the following modifications by uncommenting variables and setting their values:

1. Change `c.IPControllerApp.ssh_server` to the name of the machine that hosts the `ssh` server we'll use to access the cluster from the client. Typically this is the public name of the cluster head *as seen from outside*.
2. Change `c.IPControllerApp.reuse_files` to `True`. We'll explain this below. 
3. Change `c.HubFactory.ip` to  `u'*'`. This will cause the controller to listen on all network interfaces for engines.
4. Change `c.IPControllerApp.location` to the name of the cluster head *as seen by engines*.

What do we mean by the names of machines as seen by the client and by engines? In many network set-ups the worker nodes in a cluster are on their own network, and may know about only a small set of machines &ndash; so you couldn't access machines over the internet from a worker. The cluster head node, by contrastm is typically accessible more or less publically, and so will have a public name. (In my case the worker nodes think the cluster head is called `blob-cs.local` while the outside world think it's called `blob.cs.st-andrews.ac.uk`, the machine I can `ssh` into. So these two machines names would go into `c.IPControllerApp.location` and `c.IPControllerApp.ssh_server` respectively.)

That's it as far as the cluster is concerned. We now need to configure the client-side. On the machine running IPython we create a profile again, which may or may not have the same name as the cluster-side one: it makes sense for them to have the same name most of the time, and it will probably cause endless confusion if you don't, but you don't *have* to use the same name. The edits to this profile are simpler: we just need to tell the client where the cluster head is:

1. Change `c.IPControllerApp.ssh_server` to the public name of the cluster head as seen from outside.

(This is the same change as step 1 on the cluster side.) That's it! For most cluster set-ups where the workers and head nodes share a filing system, that's all we have to do.

### Starting the cluster

We're now ready to start the cluster. When we did this earlier we simply started everything from the IPython notebook. We can't do this for a remote cluster, as a typical workflow is to start the cluster and leave it running, connecting from outside as required. The cluster topology is also more complicated than a local cluster, in that the controller is accessed by multiple engines, each living on a different machine: this is after all the point, to get access to lots of machines' computing power simultaneously.

The mechanism is very simple. We first start a controller on the head node, then start engines on the worker nodes, and finally ship some security information to the client to let it connect.

Starting the controller is simple, on the head node just run:

<tt>
(ipcontroller --profile=cluster &)
</tt>

(Again assuming *cluster* is the name of your profile.) This puts the controller into the background: you'll see some status information appearing in the terminal, that can usually just be used to provide confidence that something is happening.

How you start the engines depends on the cluster management software, and can range from having to log-in to all the machines individually to running some more automated process. If we assume we're using Rocks, we can do the following:

<tt>
rocks run host command='(ipengine --profile=cluster &)'
</tt>

Much status information will appear, again hopefully just confirming things are working. At this stage we hopefully have a cluster controller running on the head node, connected to engines sitting ready for work on the worker nodes.

[ADD SOME SCREENSHOTS OF ALL THIS]

[EXPLAIN HOW TO DO THIS MANUALLY]

[TROUBLESHOOTING]

### Connecting to the cluster

Finally we need to connect to the controller from IPython. We need a final step to make the security work. Why security? IPython will not let *anyone* run jobs on a cluster, even if they can log-in to the cluster head. In order to use the a cluster you need a security capabilities file that describes the connection protocols and also provides a shared secret that your code presents to the cluster controller.

When the controller starts, it stores connection metadata as `.ipython/profile_cluster/security/ipcontroller-client.json`. We copy this file from the cluster head to the client. We don't need to create a complete IPython profile client-side as we did before (although it does no harm to do so): we just need the security file to be accessible. If we open a connection to the cluster, IPython will use the profile description to find the cluster head, log-in using `ssh`, authenticate itself to the controller, and create the necessary connection ready to use the engines.

Phew!

You may be happy to learn that we don't have to go through this process evey time. The reason for this is the slightly mysterious line in the cluster's profile configuration: step 2 above where we did `c.IPControllerApp.reuse_files = True`. If we kill the cluster (or it dies for some reason), when we re-start the controller it will re-use the same key file, which means that authentication from existing clients will still work. However, if you edit the cluster's configuration for any reason, you'll need to remove the key files, let the controller create new ones, and then copy these down to the client. Once the system is set up and known to be working, though, this very seldom happens. 

### An example of how to connect

Let's assume things go right. We've downloaded the `.ipython/profile_cluster/security/ipcontroller-client.json` file locally so it sits alongside our IPython notebook. We can fire-up a connection to the cluster using code like the following:

In [None]:
from IPython.parallel import Client

cluster = Client(url_file = 'ipcontroller-client.json')

Compare this to what we had [before](#sec_parallel_ipython_programming) and you'll see that the only change is to provide a link to the security file, which contains all the information we need to get the connection going under most circumstances.

If you have a slightly more complex set-up, you may need a little more work. If, for example, your username on your local workstation is different to that on the cluster, you need to edit `ipcontroller-client.json` to add your cluster-side username. If you open the file with a text editor it'll look something like this:

<code>
{
  "control": 38243, 
  "task": 50807, 
  "notification": 56186, 
  "task_scheme": "leastload", 
  "mux": 41161, 
  "iopub": 45302, 
  "ssh": "blob.cs.st-andrews.ac.uk", 
  "key": "1e57d010-afe9-49a9-af68-65f2cd027c81", 
  "registration": 54884, 
  "interface": "tcp://127.0.0.1", 
  "signature_scheme": "hmac-sha256", 
  "pack": "json", 
  "unpack": "json", 
  "location": "12.34.56.78"
}
</code>

The line `"ssh": "..."` is the connection point for the machine. If your username on your cluster is `john`, you'd change the file to:

<code>
{
...
  "ssh": "john@blob.cs.st-andrews.ac.uk", 
...
}
</code>

Another common possibility is that you've had to generate an `ssh` keypair independently for your cluster. If the private key for this is stored in `john_rsa`, for example, you'd provide this to your connection code along with the security file:

In [None]:
cluster = Client(url_file = 'ipcontroller-client.json',
                 sshkey = 'john_rsa')

These deal with most of the scenarios you're likely to encounter, both with clusters and (as we'll see later) clouds. Anything more complicated can usually be resolved by careful perusal of the documentation for IPython, bearing in mind [the way the architecture works](#sec_ipython_parallel_mechanics).

## Programming for a remote cluster

The incantations above will hopefully set up a remote cluster that has exactly the same interface as the local cluster we set up earlier. While this is a huge benefit &ndash; we don't have to commit to using a particular set-up when writing our code &ndash; things aren't quite that simple. 

There are essentially two problems we need to deal with. Firstly, the purpose of using a cluster is to get performance, and we only get maximum performance if all the engines are kept busy computing all the time. Even if we step back a little from this ideal position, we need to keep the engines *as busy as possible doing useful computing* if we're to get the performance boost that a cluster is able to give us. We need to structure code to this end, which means thinking about how to keep engines evenly supplied with work.

Secondly, the remote cluster is, well, *remote*, in the sense that there is a network between the client and the cluster head, and potentially between the cluster head and the worker nodes. Modern networks are fast, but modern simulations can be large, and even a fast network can start to collapse under the pressure of moving lots of data around. Sending lots of data can slow things down substantially, especially as an engine that's exchanging data isn't doing useful computation. So our thoughts about code structure need to deal with two related phenomena: keeping engines fed with work, and keeping them doing that work rather than talking over the network. We'll deal with these two questions in the next two sections.

### <span id="sec_parallel_work"></span> Keeping engines supplied with work

Keeping engines supplied with work comes in two phases. IPython provides half, but the programmer needs to provide the other.

IPython's half is provided by the view we take of the cluster. When we first discussed IPython parallelism [above](#sec_ipython_parallel) we used both *direct* and *load-balanced* views. A direct view lets the programmer send particular jobs to particular engines in the cluster, which is useful if you want each engine to do something different. However, often we want the engines doing the *same* thing but over different data, and this is where a load-balanced view comes in. A load-balanced view takes a set of jobs and allocates them to engines *as they become free*. Suppose we do the following: 

In [None]:
rc = view.map(f, range(10000000))

What does this do? If `view` is a direct view, then it tries to run function `f` in parallel for each of the 1000000 values in the range we're applying it to. If we have a cluster with 1000000 engines, that's fine; if not, we can't do this computation.

But what if `view` is load-balancing? Suppose we have a reasonably-sized cluster with 128 engines. The load-balanced view will take  and apply it in parallel to the first 128 elements of the range, computing `f(0)`, `f(1)`, and so on up to `f(127)`. These calculations might take radically different amounts of time. When an engine finishes and returns a result, the view gives it the next calculation &ndash; `f(128)` in this case &ndash; to do, and similarly as other engines complete. This all happens transparently of the programmer, so as far as you're concerned you map `f` over the list and get speed-up from parallel execution.

There is an implication here, though. If an engine finishes its job, it can only keep busy &ndash; keep contributing useful work &ndash; if it can receive another job. The load-banaced view handles the mechanics of this, but the programmer has to supply the jobs. In some cases jobs all take roughly the same time; in others jobs make radically different computational demands. The ideal is where every engine is kept 100% busy and they all finish their final job at roughly the same time; in reality, we can keep the engines busy until the last jobs ahave been submitted to an engine, and work will trail off as these jobs are finished and no new ones are allocated.

From a programming perspective, this suggests structuring programs to have a large number of small jobs: a large number so there is always work available, and small to allow more efficient scheduling. One might, for example, decide to run network simulations where a job is a single run &ndash; a network with a process to run over it &ndash; and perform hundreds (or thousands) or repetitions of each set of parameters being tested.

While this is good in principle from this perspective, in practice it has some disadvantages that we'll now consider.

### <span id="sec_parallel_chunking"></span> Locality and chunking

Suppose we have the following scenario:

* We want to simulate a process on a network under two parameters (say $a$ and $b$)
* We want to test $a$ and $b$ over a range of ten data points each, so over all pairs of $(a_0, b_0), (a_0, b_1), \ldots , (a_0, b_9), (a_1, b_0), \ldots , (a_9, b_9)$
* Because the process is stochastic, we decide to perform 1000 repetitions of the experiment at each parameter pair, generating a different network for each
* To make things reliable, we'll use a network of 100000 nodes for each test

Using what we learned above, we could proceed as follows. We define a function for the process dynamics for each pair of parameters. Then, for each pair of parameters, we build a list consisting of 10000 instances of 100000-node networks with the appropriate degree distribution and edges. Then we map the process function across the networks using a load-balanced view. Looking at the numbers, we end up with 100 simulation functions, each mapped across a list of 10000 elements, each of which is a 100000-node network: $10^6$ compute jobs.

Does this meet the criteria we set? Well, yes: lots of jobs, each of which, while not small, is about as small as we can make it under the circumstances (a simulation across a single network). So is this a good solution to the problem that we could code-up?

No.

Why not? Basically for three reasons:

1. the memory of the client;
1. the volume of traffic over the network; and
1. the management overhead.

Let's deal with the problems first, starting with the client's memory, and then move to some possible solutions. To run the simulations for a given pair of parameters, we have to build 10000 instances of a 100000-node network. That's going to use a considerable amount of memory, since it's $10^9$ nodes with their associated edges. A slightly different programming approach, and we'd store all the networks for *all* the parameter combinations: $10^{11}$ nodes, with edges. That's a *lot* of storage.

Suppose our client machine can cope: OK, we now have to spin-up the simulation jobs, which means passing the network data over the network to the controller, and then to an engine. The controller has to acquire and store the data, and then retain it until it can be passed to an engine. Both of these require communicating a 100000-node network between two machines. That's a *lot* of data movement.

While this is happening the client, controller, and engines all have to maintain and exchange information in the background to keep things working. We have to keep the work flowing, possibly monitoring when jobs complete and sending a new job to the now-free engine. Finally, we have to return whatever metrics we derive at the end of the simulation back to the client for processing.

We can argue about whether this partcular scenario would work on a particular hardware set-up, and the notion of something being "too big" will change over time, but I think it's safe to say that we have too much of a good thing here: too many jobs, too much network traffic, too much overhead, too much memory being used in one place. Each of these "too much"es is a recipe for a failure that stops things working and loses data.

But didn't we do everything right? Yes and no. Conceptually we did indeed do the right thing. We split up our initial problem into a load of independent computations that could be performed in parallel and then aggregated. But we then simply implemented that solution na&iuml;vely, without considering the reality of the situation. We were on the right track, however: having the right conceptual framework is half-way to a solution, what we need to do is realise this framework in a more realistic manner.

There are two techniques we can use to improve our implementation. Firstly, we can think about *where computation occurs*, and specifically about where we build the networks we're going to use: do they *have* to be passed between machines, or can we build them where they'll be used? Secondly, we can "chunk" jobs together and schedule a smaller number of larger jobs: still enough to keep the engines busy, but not enough to cause problems.

Let's use these two techniques in our scenario. Conceptually we have a simulation function and a network it runs over. We could have the simulation function build the network itself and then work on it. That sounds equivalent, and indeed it is, at one level: that's really the point. But implementationally, in the first case we created functions and networks at the client and then communicated them to an engine *via* the controller; in the second case we created functions at the client and communicated them, and they then built their own network locally at the engine: no communication needed. We've immediately reduced the client's memory requirement and the amount of data communicated &ndash; and, incidentally, speeded things up by allowing non-trivial task of creating the large networks to happen in parallel at the engines that will then use them.

We've still got a lot of jobs. Currently our simulation function performs a single simulation. We could re-code it to perform all 10000 repetitions for a given pair of parameters, building a new random network locally for each repetition. Now instead of $10^6$ jobs (10000 repetitions each of 100 parameter pairs), we have 100 jobs each of which performs 10000 repetitions. Moreover, since the reason we're doing the repetitions is probably to compute average values for the various metrics, we could do *that* computation at the engine too and just ship back the average values, rather than the values for each repetition (since we don't care about them). Again, we've reduced communications, *and* speeded things up by performing parallel calculations, *and* saved computation at the client since the code returns averages not raw data needing further processing. 

But we need to pause a moment: 100 jobs does not sound like very many. If we had a cluster with 128 engines &ndash; not unreasonable at the time of writing &ndash; then we only have enough work for 100 of them: we leave around 20% of the engines idle. On the other hand, if we only have 64 engines, we can keep them all busy for longer. If we had 128 engines, maybe we'd want to build smaller chunks &ndash; two jobs doing 5000 repetitions each for each parameter pair, perhaps? &ndash; to give ourselves 200 jobs to spread across the available machines. Alternatively we could increase the number of parameters, maybe explore $a$ and $b$ over 20 points each, yielding 400 jobs each of 10000 repetitions. (If that sounds a bit gratuitous, in reality we often let problems expand to fill the computers we have available: a sort of computational version of [Parkinson's law](https://en.wikipedia.org/wiki/Parkinson%27s_law).)

Let's recapitulate the journey we've just made. We started with a scientific problem to solve. We build a conceptual model that divided-up the problem into lots of independent jobs to solve independently and then combine together. We then tweaked this model to make it realistic, taking account of the limitations of real computers and their interconnects, and arrived at a model that is tailored to fit actual computational environment we have available. We can now code this up and run it. It's important to note that the steps we've taken haven't really tied us down too much: we'd need to re-code the simulation slightly and provide appropriate values for the number of repetitions and the like, but fundamentally the final code is very much like our initial formulation. Later we'll see how to do this in practice.  

## <span id="sec_cloud_computing"></span> Cloud computing

## Some more programming techiques

As will now be abundantly apparent, performing research-grade simulation is very time-consuming even if you have access to research-grade computing power &ndash; and many of us don't, at least a lot of the time. In any event, running a large simulation from IPython will take a long time. Using the techniques we've used above, we're locked-out of the notebook while the simulation is on-going. We can't carry on writing in the notebook, or doing other useful work. Worse, we can't disconnect the notebook from the cluster for the duration of the simulation, as doing so will lose the data we've spent so much effort computing. This is bad for workstation users &ndash; the machine can't be re-booted, a crash will lose data, and so on &ndash; but it's *really* bad for laptop users, who can't head off on the road. And let's face it, most scientists these days use laptops as personal machines: indeed, one of the great things about compute clusters is that you don't *need* a big, dedicated machine on your desk.

Fortunately, there is an alternative. It involves starting computing jobs *asynchronously*, without waiting for them to complete as happens normally. We can then disconnect from the cluster while it gets on with doing our simulations, and then re-connect later to pick up the results. Doing this requires a small change in the set-up of the cluster we're using, and a small change in the way we code the client-side of our computation, but no changes at all in the simulation part.

First, what does "asynchronous" mean? [TO BE COMPLETED]

### Setting up a cluster for asynchronous simulation

So asynchronous operation means that we submit jobs to a controller and then disconnect, returning later to collect the results. This implies that the controller we connect to remembers the results of calculations until we come back for them. Moreover, re-connection means that, from the controller's perspective, a *new* client is asking for the results of a *different* client's computations. 

The default set-up for a controller is a little too simple for this approach: it assumes that a client asks for its own results, and forgets uncollected results when clients disconnect. To enable the behaviour we need, we need to  set things up so that the controller remembers results. As a side benefit, this also makes the controller more robust to engine failures.

We discussed above how each IPython compute job is bound to a message with a message identifier. The tweak we'll make to the controller's set-up is to have it store the message ids and their results in a database that persists over time. This means that a client can request the result of a specific message and retrieve it from the database (if it's been computed yet), wait for it (if it hasn't), or query whether it's ready or not and come back later.

IPython can use a number of different databases to manage its message store. The simplest is to use SQLite, support for which is built-in to Python. You can also use MondoDB or other database providers for special circumstances, but the purposes of this book we'll stick to SQLite as it's proved robust and scalable enough. 

Making use of SQLite as message store involves changing a single configuration parameter. We again edit `.ipython/profile_cluster/ipcontroller.py` (assuming our profile is called *cluster*) and make a single edit:

1. Change `c.HubFactory.db_class` to `IPython.parallel.controller.sqlitedb.SQLiteDB`.

Starting the cluster will then create a database called `/ipython/profile_cluster/tasks.db` that will be populated with incoming messages and their computed results.

### Submitting asynchronous jobs

[map_async AND SO ON]

[MESSAGES IDS]

[STORING MESSAGE IDS IN A FILE FOR LATER USE]

### Checking progress

Now we have a set of compute jobs executing on our cluster, and a set of message corresponding to those jobs. We need to be able to check whether the jobs have finished, and then collect the results. Both these tasks can be performed easily from inside an IPython notebook, without blocking.

Suppose our computations have been running for some time, and we want to check on them. The following code will accomplish this:

In [None]:
# re-connect back to the cluster
cluster2 = Client(profile = 'cluster')

# re-load the message identifiers
with open('jobs.pickle', 'rb') as handle:
    jobs = pickle.load(handle)
    
# query job state
status = [ cluster2.get_result(msg).ready() for msg in jobs ]
print '{n} jobs running, {c}% complete'.format(n = len(status),
                                               c = int(((len(filter(lambda f: f, status)) + 0.0) / len(status)) * 100))

# tidy up
cluster2.close()

We first re-connect to the cluster and re-load the message ids of our jobs. We then use `get_result()` to present the message ids one by one to the cluster. `get_result()` returns an `AsyncResult` object, a Python object that represents the current state of an asynchronously-executed job. Calling `ready()` on this object tests whether the job's result is ready or not, so we end up with a list of ready values, `True` for jobs that have completed and `False` for those that are still on-going. We then present these as a percentage complete. Closing the connection to the cluster is quite important, as otherwise the underlying Python engine can run out of file handles if you keep a notebook open for a long time and check often.

We can use this code to query that state of our computations. If they're not complete, go away and wait again. Eventually, all being well, all the results will be ready and we can retrieve them.

### Retrieving asynchonous results

Once everything is ready, we can use the `get()` method on `AsyncResult` to retrieve the results:

In [None]:
# retrieve the results
cluster2 = Client(profile = 'cluster')
results = cluster2.get_result(jobs).get()
cluster2.close()

This leaves the results of all the jobs associated with the message ids in a single list in `results`. `get()` is a synchronous method: it blocks until the results are available. We could call it anytime if we're willing to go synchronous, but in this case we already know that all the results are available, so we know that `get()` will successfully retrieve the results without waiting.

It's important to note three things at this point. Firstly, we can disconnect the client completely from the cluster. Since some simulations might take a week (or more), this is clearly a useful thing to be able to do, especially if you're a laptop-user. Secondly, to get this functionality, we've changed the set-up of the cluster and added some client code, but we haven't changed the simulation code at all: asynchronous operation in IPython is purely about how you *call* code, not about the *called* code at all. This isn't true of other computing frameworks, and is quite a benefit: it means that IPython simulations scale-out much more easily, since you can run them small and debug them locally and synchronously before running them asynchronously for the real task. Finally, the optimisations we identified to avoid moving data over the network, and to maximise parallelism, all work just as well with asynchronous operation as with synchronous, so moving to asynchronous coding just adds some more flexibility, again without having to re-think the simulation code too much.  

## <span id="sec_repeatability_large"></span> Some more notes on reproducibility

## Summary